Source code for manheim_c7n_tools.config

# Copyright 2017-2019 Manheim / Cox Automotive
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import jsonschema
import logging
import yaml
import os

from c7n_mailer.cli import CONFIG_SCHEMA as MAILER_SCHEMA

#: Schema of the ``manheim-c7n-tools.yml`` configuration file. This is a schema
#: designed for use with the ``jsonschema`` package. This schema is for ONE
#: ACCOUNT in the config file; the file itself is made up of an array of objects
#: matching this schema.
MANHEIM_CONFIG_SCHEMA = {
    'type': 'object',
    'additionalProperties': False,
    'required': [
        'account_id',
        'account_name',
        'regions',
        'mailer_config',
        'mailer_regions',
        'output_s3_bucket_name',
        'custodian_log_group',
        'dead_letter_queue_arn',
        'role_arn'
    ],
    'properties': {
        # The AWS Account ID
        'account_id': {'type': ['number', 'string']},
        # The AWS account name (should be the official Account Alias)
        'account_name': {'type': 'string'},
        # Optional configuration for a role to assume when running this
        # tooling, for running cross-account.
        'assume_role': {
            'type': 'object',
            'required': ['role_arn'],
            'properties': {
                'role_arn': {'type': 'string'},
                'external_id': {'type': 'string'},
                'duration_seconds': {'type': 'number'}
            }
        },
        # Optional policy source paths. If not specified, uses the current
        # directory
        'policy_source_paths': {'type': 'array', 'items': {'type': 'string'}},
        # Name prefix for custodian Lambda functions
        'function_prefix': {'type': 'string'},
        # A list of region names that custodian should run in for this account
        'regions': {'type': 'array', 'items': {'type': 'string'}},
        # Name of the S3 bucket for storing Custodian output; should include
        # %%AWS_REGION%%, for buckets in each region
        'output_s3_bucket_name': {'type': 'string'},
        # Name of the CloudWatch Log Group to write logs to
        'custodian_log_group': {'type': 'string'},
        # List of region names to run c7n-mailer in
        'mailer_regions': {'type': 'array', 'items': {'type': 'string'}},
        # ARN of the Dead Letter SQS queue
        'dead_letter_queue_arn': {'type': 'string'},
        # ARN of the role to run Lambda functions under
        'role_arn': {'type': 'string'},
        # Array of notification recipients for orphaned Lambda/CWE Rule
        # notifications; set to empty array to disable this functionality
        'cleanup_notify': {'type': 'array'},
        # Optional list of notification targets to add to EVERY policy
        'always_notify': {
            'to': {'type': 'array', 'items': {'type': 'string'}},
            'transport': {
                'oneOf': [
                    {
                        'type': 'object',
                        'required': ['type', 'queue'],
                        'properties': {
                            'queue': {'type': 'string'},
                            'type': {'enum': ['sqs']}
                        }
                    },
                    {
                        'type': 'object',
                        'required': ['type', 'topic'],
                        'properties': {
                            'topic': {'type': 'string'},
                            'type': {'enum': ['sns']},
                            'attributes': {'type': 'object'},
                        }
                    }]
            },
        },

        # Incorporate c7n-mailer's config schema nested under a
        # ``mailer_config`` key. See upstream source of c7n_mailer.
        'mailer_config': MAILER_SCHEMA
    }
}

logger = logging.getLogger(__name__)


[docs]class ManheimConfig(object): """ Configuration object for manheim-c7n-tools / :py:mod:`~.runner`. """ def __init__(self, **kwargs): self.config_path = kwargs.pop('config_path') logger.debug('Validating configuration...') jsonschema.validate(dict(kwargs), MANHEIM_CONFIG_SCHEMA) self._config = kwargs self._config['account_id'] = str(self._config['account_id']) if 'function_prefix' not in self._config: self._config['function_prefix'] = 'custodian-' if 'cleanup_notify' not in self._config: self._config['cleanup_notify'] = []
[docs] @staticmethod def from_file(path, account_name): """ Construct a new ManheimConfig object from the YML configuration file at the specified path. :param path: path of the yaml config file to load :type path: str :param account_name: top-level account name/alias to load :type account_name: str :return: new ManheimConfig object for the specified config file :rtype: ManheimConfig """ logger.info('Loading config from: %s', path) with open(path, 'r') as fh: config_dict = yaml.load(fh.read(), Loader=yaml.SafeLoader) for acct_conf in config_dict: if acct_conf['account_name'] == account_name: acct_conf['config_path'] = path return ManheimConfig(**acct_conf) raise RuntimeError( 'ERROR: No account with name "%s" in %s' % ( account_name, path ) )
[docs] @staticmethod def list_accounts(path): """ Given the path to a manheim-c7n-tools YML configuration file, return a dict of account name to account ID number for each account defined in the file. :param path: path of the yaml config file to load :type path: str :return: dict of account name/alias used in the file to Account ID :rtype: dict """ logger.info('Loading config from: %s', path) with open(path, 'r') as fh: config_list = yaml.load(fh.read(), Loader=yaml.SafeLoader) return {x['account_name']: str(x['account_id']) for x in config_list}
[docs] def config_for_region(self, region_name): """ Return a copy of this configuration for the specified region name. This currently uses an inefficient but simple approach - it serializes the current config to a YAML string, replaces all occurrences of ``%%AWS_REGION%%`` with the specified ``region_name`` and all occurrences of ``%%POLICYGEN_ENV_name%%`` replaced with the value of the corresponding environment variable, then deserializes the result and returns a new :py:class:`~.ManheimConfig` object using it. :param region_name: the region name to build a config for :type region_name: str :return: new ManheimConfig for the specified region :rtype: ManheimConfig """ d = {'config_path': self.config_path} d.update(self._config) # AWS_REGION replacement config_str = yaml.dump( d, Dumper=yaml.Dumper ).replace('%%AWS_REGION%%', region_name) # env var replacements for k, v in os.environ.items(): if not k.startswith('POLICYGEN_ENV_'): continue config_str = config_str.replace('%%' + k + '%%', v) return ManheimConfig(**yaml.load(config_str, Loader=yaml.SafeLoader))
def __getattr__(self, k): try: return self._config[k] except KeyError: raise AttributeError(k)