"""
awslimitchecker/runner.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
##############################################################################
Copyright 2015-2018 Jason Antman <jason@jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
##############################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
##############################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
##############################################################################
"""
import sys
import argparse
import logging
import json
import boto3
import time
from .checker import AwsLimitChecker
from .utils import StoreKeyValuePair, dict2cols, issue_string_tuple
from .limit import SOURCE_TA, SOURCE_API, SOURCE_QUOTAS
from .metrics import MetricsProvider
from .alerts import AlertProvider
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger()
# suppress boto3 internal logging below WARNING level
boto3_log = logging.getLogger("boto3")
boto3_log.setLevel(logging.WARNING)
boto3_log.propagate = True
# suppress botocore internal logging below WARNING level
botocore_log = logging.getLogger("botocore")
botocore_log.setLevel(logging.WARNING)
botocore_log.propagate = True
[docs]class Runner(object):
[docs] def __init__(self):
self.colorize = True
self.checker = None
self.skip_ta = False
self.service_name = None
self.skip_check = []
[docs] def parse_args(self, argv):
"""
parse arguments/options
:param argv: argument list to parse, usually ``sys.argv[1:]``
:type argv: list
:returns: parsed arguments
:rtype: :py:class:`argparse.Namespace`
"""
desc = 'Report on AWS service limits and usage via boto3, optionally ' \
'warn about any services with usage nearing or exceeding their' \
' limits. For further help, see ' \
'<http://awslimitchecker.readthedocs.org/>'
# ###### IMPORTANT license notice ##########
# Pursuant to Sections 5(b) and 13 of the GNU Affero General Public
# License, version 3, this notice MUST NOT be removed, and MUST be
# displayed to ALL USERS of this software, even if they interact with
# it remotely over a network.
#
# See the "Development" section of the awslimitchecker documentation
# (docs/source/development.rst or
# <http://awslimitchecker.readthedocs.org/en/latest/development.html> )
# for further information.
# ###### IMPORTANT license notice ##########
epilog = 'awslimitchecker is AGPLv3-licensed Free Software. Anyone ' \
'using this program, even remotely over a network, is ' \
'entitled to a copy of the source code. Use `--version` for ' \
'information on the source code location.'
p = argparse.ArgumentParser(description=desc, epilog=epilog)
p.add_argument('-S', '--service', action='store', nargs='*',
help='perform action for only the specified service name'
'; see -s|--list-services for valid names')
p.add_argument('--skip-service', action='append', default=[],
dest='skip_service',
help='avoid performing actions for the specified service'
' name; see -s|--list-services for valid names')
p.add_argument('--skip-check', action='append', default=[],
dest='skip_check',
help='avoid performing actions for the specified check'
' name')
p.add_argument('-s', '--list-services', action='store_true',
default=False,
help='print a list of all AWS service types that '
'awslimitchecker knows how to check')
p.add_argument('-l', '--list-limits', action='store_true',
default=False,
help='print all AWS effective limits in "service_name/'
'limit_name" format')
p.add_argument('--list-defaults', action='store_true', default=False,
help='print all AWS default limits in "service_name/'
'limit_name" format')
p.add_argument('-L', '--limit', action=StoreKeyValuePair,
help='override a single AWS limit, specified in '
'"service_name/limit_name=value" format; can be '
'specified multiple times.')
p.add_argument('--limit-override-json', action='store', type=str,
default=None,
help='Absolute or relative path, or s3:// URL, to a '
'JSON file specifying limit overrides. See docs '
'for expected format.')
p.add_argument('--threshold-override-json', action='store', type=str,
default=None,
help='Absolute or relative path, or s3:// URL, to a '
'JSON file specifying threshold overrides. See '
'docs for expected format.')
p.add_argument('-u', '--show-usage', action='store_true',
default=False,
help='find and print the current usage of all AWS '
'services with known limits')
p.add_argument('--iam-policy', action='store_true',
default=False,
help='output a JSON serialized IAM Policy '
'listing the required permissions for '
'awslimitchecker to run correctly.')
p.add_argument('-W', '--warning-threshold', action='store',
type=int, default=80,
help='default warning threshold (percentage of '
'limit); default: 80')
p.add_argument('-C', '--critical-threshold', action='store',
type=int, default=99,
help='default critical threshold (percentage of '
'limit); default: 99')
p.add_argument('-P', '--profile', action='store', dest='profile_name',
type=str, default=None,
help='Name of profile in the AWS cross-sdk credentials '
'file to use credentials from; similar to the '
'corresponding awscli option')
p.add_argument('-A', '--sts-account-id', action='store',
type=str, default=None,
help='for use with STS, the Account ID of the '
'destination account (account to assume a role in)')
p.add_argument('-R', '--sts-account-role', action='store',
type=str, default=None,
help='for use with STS, the name of the IAM role to '
'assume')
p.add_argument('-E', '--external-id', action='store', type=str,
default=None, help='External ID to use when assuming '
'a role via STS')
p.add_argument('-M', '--mfa-serial-number', action='store', type=str,
default=None, help='MFA Serial Number to use when '
'assuming a role via STS')
p.add_argument('-T', '--mfa-token', action='store', type=str,
default=None, help='MFA Token to use when assuming '
'a role via STS')
p.add_argument('-r', '--region', action='store',
type=str, default=None,
help='AWS region name to connect to; required for STS')
p.add_argument('--role-partition', action='store', type=str,
default='aws',
help='AWS partition name to use for account_role when '
'connecting via STS; see documentation for more '
'information (default: "aws")')
p.add_argument('--ta-api-region', action='store', type=str,
default='us-east-1',
help='Region to use for Trusted Advisor / Support API'
' (default: us-east-1)')
p.add_argument('--skip-ta', action='store_true', default=False,
help='do not attempt to pull *any* information on limits'
' from Trusted Advisor')
p.add_argument('--skip-quotas', action='store_true', default=False,
help='Do not attempt to connect to Service Quotas '
'service or use its data for current limits')
g = p.add_mutually_exclusive_group()
g.add_argument('--ta-refresh-wait', dest='ta_refresh_wait',
action='store_true', default=False,
help='If applicable, refresh all Trusted Advisor '
'limit-related checks, and wait for the refresh to'
' complete before continuing.')
g.add_argument('--ta-refresh-trigger', dest='ta_refresh_trigger',
action='store_true', default=False,
help='If applicable, trigger refreshes for all Trusted '
'Advisor limit-related checks, but do not wait for '
'them to finish refreshing; trigger the refresh '
'and continue on (useful to ensure checks are '
'refreshed before the next scheduled run).')
g.add_argument('--ta-refresh-older', dest='ta_refresh_older',
action='store', type=int, default=None,
help='If applicable, trigger refreshes for all Trusted '
'Advisor limit-related checks with results more '
'than this number of seconds old. Wait for the '
'refresh to complete before continuing.')
p.add_argument('--ta-refresh-timeout', dest='ta_refresh_timeout',
type=int, action='store', default=None,
help='If waiting for TA checks to refresh, wait up to '
'this number of seconds before continuing on '
'anyway.')
p.add_argument('--no-color', action='store_true', default=False,
help='do not colorize output')
p.add_argument('--no-check-version', action='store_false', default=True,
dest='check_version',
help='do not check latest version at startup')
p.add_argument('-v', '--verbose', dest='verbose', action='count',
default=0,
help='verbose output. specify twice for debug-level '
'output.')
p.add_argument('-V', '--version', dest='version', action='store_true',
default=False,
help='print version number and exit.')
p.add_argument('--list-metrics-providers',
dest='list_metrics_providers',
action='store_true', default=False,
help='List available metrics providers and exit')
p.add_argument('--metrics-provider', dest='metrics_provider', type=str,
action='store', default=None,
help='Metrics provider class name, to enable sending '
'metrics')
p.add_argument('--metrics-config', action=StoreKeyValuePair,
dest='metrics_config',
help='Specify key/value parameters for the metrics '
'provider constructor. See documentation for '
'further information.')
p.add_argument('--list-alert-providers',
dest='list_alert_providers',
action='store_true', default=False,
help='List available alert providers and exit')
p.add_argument('--alert-provider', dest='alert_provider', type=str,
action='store', default=None,
help='Alert provider class name, to enable sending '
'notifications')
p.add_argument('--alert-config', action=StoreKeyValuePair,
dest='alert_config',
help='Specify key/value parameters for the alert '
'provider constructor. See documentation for '
'further information.')
args = p.parse_args(argv)
args.ta_refresh_mode = None
if args.ta_refresh_wait:
args.ta_refresh_mode = 'wait'
elif args.ta_refresh_trigger:
args.ta_refresh_mode = 'trigger'
elif args.ta_refresh_older is not None:
args.ta_refresh_mode = args.ta_refresh_older
return args
[docs] def list_services(self):
for x in sorted(self.checker.get_service_names()):
print(x)
[docs] def list_limits(self):
limits = self.checker.get_limits(
use_ta=(not self.skip_ta),
service=self.service_name)
data = {}
for svc in sorted(limits.keys()):
for lim in sorted(limits[svc].keys()):
src_str = ''
if limits[svc][lim].get_limit_source() == SOURCE_API:
src_str = ' (API)'
if limits[svc][lim].get_limit_source() == SOURCE_TA:
src_str = ' (TA)'
if limits[svc][lim].get_limit_source() == SOURCE_QUOTAS:
src_str = ' (Quotas)'
if limits[svc][lim].has_resource_limits():
for usage in limits[svc][lim].get_current_usage():
id = "{s}/{l}/{r}".format(s=svc, l=lim,
r=usage.resource_id)
data[id] = '{v} (API)'.format(v=usage.get_maximum())
else:
data["{s}/{l}".format(s=svc, l=lim)] = '{v}{t}'.format(
v=limits[svc][lim].get_limit(),
t=src_str)
print(dict2cols(data))
[docs] def list_defaults(self):
limits = self.checker.get_limits(service=self.service_name)
data = {}
for svc in sorted(limits.keys()):
for lim in sorted(limits[svc].keys()):
data["{s}/{l}".format(s=svc, l=lim)] = '{v}'.format(
v=limits[svc][lim].default_limit)
print(dict2cols(data))
[docs] def iam_policy(self):
policy = self.checker.get_required_iam_policy()
print(json.dumps(policy, sort_keys=True, indent=2))
[docs] def show_usage(self):
self.checker.find_usage(
service=self.service_name, use_ta=(not self.skip_ta))
limits = self.checker.get_limits(
service=self.service_name, use_ta=(not self.skip_ta))
data = {}
for svc in sorted(limits.keys()):
for lim in sorted(limits[svc].keys()):
data["{s}/{l}".format(s=svc, l=lim)] = '{v}'.format(
v=limits[svc][lim].get_current_usage_str())
print(dict2cols(data))
[docs] def check_thresholds(self, metrics=None):
have_warn = False
have_crit = False
problems = self.checker.check_thresholds(
use_ta=(not self.skip_ta),
service=self.service_name
)
if metrics:
for svc, svc_limits in sorted(self.checker.get_limits().items()):
if self.service_name and svc not in self.service_name:
continue
for _, limit in sorted(svc_limits.items()):
metrics.add_limit(limit)
columns = {}
for svc in sorted(problems.keys()):
for lim_name in sorted(problems[svc].keys()):
check_name = "{svc}/{limit}".format(
svc=svc,
limit=lim_name,
)
if check_name in self.skip_check:
continue
limit = problems[svc][lim_name]
warns = limit.get_warnings()
crits = limit.get_criticals()
if len(crits) > 0:
have_crit = True
if len(warns) > 0:
have_warn = True
k, v = issue_string_tuple(
svc, limit, crits, warns, colorize=self.colorize
)
columns[k] = v
d2c = dict2cols(columns)
print(d2c)
# might as well use the Nagios exit codes,
# even though our output doesn't work for that
if have_crit:
return 2, problems, d2c
if have_warn:
return 1, problems, d2c
return 0, problems, d2c
[docs] def set_limit_overrides(self, overrides):
for key in sorted(overrides.keys()):
if key.count('/') != 1:
raise ValueError("Limit names must be in 'service/limit' "
"format; {k} is invalid.".format(k=key))
svc, limit = key.split('/')
self.checker.set_limit_override(svc, limit, int(overrides[key]))
[docs] def load_json(self, path):
"""Load JSON from either a local file or S3"""
if path.startswith('s3://'):
parsed = urlparse(path)
s3key = parsed.path.lstrip('/')
logger.debug(
'Reading JSON from S3 bucket "%s" key "%s"',
parsed.netloc, s3key
)
client = boto3.client('s3')
resp = client.get_object(Bucket=parsed.netloc, Key=s3key)
data = resp['Body'].read()
else:
logger.debug('Reading JSON from: %s', path)
with open(path, 'r') as fh:
data = fh.read()
if isinstance(data, type(b'')):
data = data.decode()
return json.loads(data)
[docs] def set_limit_overrides_from_json(self, path):
j = self.load_json(path)
logger.debug('Limit overrides: %s', j)
self.checker.set_limit_overrides(j)
logger.debug('Done setting limit overrides from JSON.')
[docs] def set_threshold_overrides_from_json(self, path):
j = self.load_json(path)
logger.debug('Threshold overrides: %s', j)
self.checker.set_threshold_overrides(j)
logger.debug('Done setting threshold overrides from JSON.')
[docs] def console_entry_point(self):
args = self.parse_args(sys.argv[1:])
self.service_name = args.service
if args.verbose == 1:
logger.setLevel(logging.INFO)
elif args.verbose > 1:
# debug-level logging hacks
FORMAT = "%(asctime)s [%(levelname)s %(filename)s:%(lineno)s - " \
"%(name)s.%(funcName)s() ] %(message)s"
debug_formatter = logging.Formatter(fmt=FORMAT)
logger.handlers[0].setFormatter(debug_formatter)
logger.setLevel(logging.DEBUG)
if args.no_color:
self.colorize = False
if args.skip_ta:
self.skip_ta = True
# the rest of these actually use the checker
self.checker = AwsLimitChecker(
warning_threshold=args.warning_threshold,
critical_threshold=args.critical_threshold,
profile_name=args.profile_name,
account_id=args.sts_account_id,
account_role=args.sts_account_role,
region=args.region,
external_id=args.external_id,
mfa_serial_number=args.mfa_serial_number,
mfa_token=args.mfa_token,
ta_refresh_mode=args.ta_refresh_mode,
ta_refresh_timeout=args.ta_refresh_timeout,
check_version=args.check_version,
role_partition=args.role_partition,
ta_api_region=args.ta_api_region,
skip_quotas=args.skip_quotas
)
if args.version:
print('awslimitchecker {v} (see <{s}> for source code)'.format(
s=self.checker.get_project_url(),
v=self.checker.get_version()
))
raise SystemExit(0)
if len(args.skip_service) > 0:
self.checker.remove_services(args.skip_service)
if len(args.skip_check) > 0:
for check in args.skip_check:
self.skip_check.append(check)
if args.limit_override_json is not None:
self.set_limit_overrides_from_json(args.limit_override_json)
if args.threshold_override_json is not None:
self.set_threshold_overrides_from_json(
args.threshold_override_json
)
if len(args.limit) > 0:
self.set_limit_overrides(args.limit)
if args.list_services:
self.list_services()
raise SystemExit(0)
if args.list_defaults:
self.list_defaults()
raise SystemExit(0)
if args.list_limits:
self.list_limits()
raise SystemExit(0)
if args.iam_policy:
self.iam_policy()
raise SystemExit(0)
if args.show_usage:
self.show_usage()
raise SystemExit(0)
if args.list_metrics_providers:
print('Available metrics providers:')
for p in sorted(MetricsProvider.providers_by_name().keys()):
print(p)
raise SystemExit(0)
if args.list_alert_providers:
print('Available alert providers:')
for p in sorted(AlertProvider.providers_by_name().keys()):
print(p)
raise SystemExit(0)
# else check
alerter = None
if args.alert_provider:
alerter = AlertProvider.get_provider_by_name(
args.alert_provider
)(self.checker.region_name, **args.alert_config)
start_time = time.time()
try:
metrics = None
if args.metrics_provider:
metrics = MetricsProvider.get_provider_by_name(
args.metrics_provider
)(self.checker.region_name, **args.metrics_config)
res, problems, problem_str = self.check_thresholds(metrics)
duration = time.time() - start_time
logger.info('Finished checking limits in %s seconds', duration)
if metrics:
metrics.set_run_duration(duration)
metrics.flush()
except Exception as ex:
if alerter:
alerter.on_critical(
None, None, exc=ex, duration=time.time() - start_time
)
raise
if alerter:
if res == 2:
alerter.on_critical(
problems, problem_str, duration=time.time() - start_time
)
elif res == 1:
alerter.on_warning(
problems, problem_str, duration=time.time() - start_time
)
else:
alerter.on_success(duration=time.time() - start_time)
# with alert provider, always exit zero
raise SystemExit(0)
raise SystemExit(res)
[docs]def console_entry_point():
r = Runner()
r.console_entry_point()
if __name__ == "__main__":
console_entry_point()