Source code for flask_dynamo.manager

"""Main Flask integration."""

from os import environ

from boto3.session import Session
from flask import current_app

from .errors import ConfigurationError


[docs]class DynamoLazyTables(object): """Manages access to Dynamo Tables.""" def __init__(self, connection, table_config): self._table_config = table_config self._connection = connection def __getitem__(self, name): """Get the connection for a table by name.""" return self._connection.Table(name)
[docs] def keys(self): """The table names in our config.""" return [t['TableName'] for t in self._table_config]
[docs] def len(self): """The number of tables we are configured for.""" return len(self.keys())
[docs] def items(self): """The table tuples (name, connection.Table()).""" for table_name in self.keys(): yield (table_name, self[table_name])
def _wait(self, table_name, type_waiter): waiter = self._connection.meta.client.get_waiter(type_waiter) waiter.wait(TableName=table_name)
[docs] def wait_exists(self, table_name): self._wait(table_name, 'table_exists')
[docs] def wait_not_exists(self, table_name): self._wait(table_name, 'table_not_exists')
[docs] def create_all(self, wait=False): tables_name_list = [table.name for table in self._connection.tables.all()] for table in self._table_config: if table['TableName'] not in tables_name_list: self._connection.create_table(**table) if wait: for table in self._table_config: if table['TableName'] not in tables_name_list: self.wait_exists(table['TableName'])
[docs] def destroy_all(self, wait=False): for table in self._table_config: table = self._connection.Table(table['TableName']) table.delete() if wait: for table in self._table_config: self.wait_not_exists(table['TableName'])
[docs]class Dynamo(object): """DynamoDB engine manager.""" DEFAULT_REGION = 'us-east-1' def __init__(self, app=None): """ Initialize this extension. :param obj app: The Flask application (optional). """ self.app = app if app is not None: self.init_app(app)
[docs] def init_app(self, app): """ Initialize this extension. :param obj app: The Flask application. """ self._init_settings(app) self._check_settings(app) app.extensions['dynamo'] = self conn = self._connection(app=app) self.tables = DynamoLazyTables(conn, app.config['DYNAMO_TABLES'])
@staticmethod def _init_settings(app): """Initialize all of the extension settings.""" app.config.setdefault('DYNAMO_SESSION', None) app.config.setdefault('DYNAMO_TABLES', []) app.config.setdefault('DYNAMO_ENABLE_LOCAL', environ.get('DYNAMO_ENABLE_LOCAL', False)) app.config.setdefault('DYNAMO_LOCAL_HOST', environ.get('DYNAMO_LOCAL_HOST', None)) app.config.setdefault('DYNAMO_LOCAL_PORT', environ.get('DYNAMO_LOCAL_PORT', None)) app.config.setdefault('AWS_ACCESS_KEY_ID', environ.get('AWS_ACCESS_KEY_ID', None)) app.config.setdefault('AWS_SECRET_ACCESS_KEY', environ.get('AWS_SECRET_ACCESS_KEY', None)) app.config.setdefault('AWS_SESSION_TOKEN', environ.get('AWS_SESSION_TOKEN', None)) app.config.setdefault('AWS_REGION', environ.get('AWS_REGION', Dynamo.DEFAULT_REGION)) @staticmethod def _check_settings(app): """ Check all user-specified settings to ensure they're correct. We'll raise an error if something isn't configured properly. :raises: ConfigurationError """ if app.config['AWS_ACCESS_KEY_ID'] and not app.config['AWS_SECRET_ACCESS_KEY']: raise ConfigurationError('You must specify AWS_SECRET_ACCESS_KEY if you are specifying AWS_ACCESS_KEY_ID.') if app.config['AWS_SECRET_ACCESS_KEY'] and not app.config['AWS_ACCESS_KEY_ID']: raise ConfigurationError('You must specify AWS_ACCESS_KEY_ID if you are specifying AWS_SECRET_ACCESS_KEY.') if app.config['DYNAMO_ENABLE_LOCAL'] and not (app.config['DYNAMO_LOCAL_HOST'] and app.config['DYNAMO_LOCAL_PORT']): raise ConfigurationError('If you have enabled Dynamo local, you must specify the host and port.') def _get_app(self): """ Helper method that implements the logic to look up an application. pass """ if current_app: return current_app if self.app is not None: return self.app raise RuntimeError( 'application not registered on dynamo instance and no application' 'bound to current context' ) @staticmethod def _get_ctx(app): """ Gets the dyanmo app context state. """ try: return app.extensions['dynamo'] except KeyError: raise RuntimeError( 'flask-dynamo extension not registered on flask app' ) @staticmethod def _init_session(app): session_kwargs = {} # Only apply if manually specified: otherwise, we'll let boto # figure it out (boto will sniff for ec2 instance profile # credentials). if app.config['AWS_ACCESS_KEY_ID']: session_kwargs['aws_access_key_id'] = app.config['AWS_ACCESS_KEY_ID'] if app.config['AWS_SECRET_ACCESS_KEY']: session_kwargs['aws_secret_access_key'] = app.config['AWS_SECRET_ACCESS_KEY'] if app.config['AWS_SESSION_TOKEN']: session_kwargs['aws_session_token'] = app.config['AWS_SESSION_TOKEN'] if app.config['AWS_REGION']: session_kwargs['region_name'] = app.config['AWS_REGION'] return Session(**session_kwargs) def _session(self, app=None): if not app: app = self._get_app() ctx = self._get_ctx(app) try: return ctx._session_instance except AttributeError: ctx._session_instance = app.config['DYNAMO_SESSION'] or self._init_session(app) return ctx._session_instance @property def session(self): """ Our DynamoDB session. This will be lazily created if this is the first time this is being accessed. This session is reused for performance. """ return self._session() def _connection(self, app=None): if not app: app = self._get_app() ctx = self._get_ctx(app) try: return ctx._connection_instance except AttributeError: client_kwargs = {} local = True if app.config['DYNAMO_ENABLE_LOCAL'] else False if local: client_kwargs['endpoint_url'] = 'http://{}:{}'.format( app.config['DYNAMO_LOCAL_HOST'], app.config['DYNAMO_LOCAL_PORT'], ) ctx._connection_instance = self._session(app=app).resource('dynamodb', **client_kwargs) return ctx._connection_instance @property
[docs] def connection(self): """ Our DynamoDB connection. This will be lazily created if this is the first time this is being accessed. This connection is reused for performance. """ return self._connection()
[docs] def get_table(self, table_name): return self.tables[table_name]
[docs] def create_all(self, wait=False): """ Create all user-specified DynamoDB tables. We'll ignore table(s) that already exists. We'll error out if the tables can't be created for some reason. """ self.tables.create_all(wait=wait)
[docs] def destroy_all(self, wait=False): """ Destroy all user-specified DynamoDB tables. We'll error out if the tables can't be destroyed for some reason. """ self.tables.destroy_all(wait=wait)

Related Topics