http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py b/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py new file mode 100644 index 0000000..30ecc26 --- /dev/null +++ b/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py @@ -0,0 +1,2168 @@ +import os +import uuid +from Queue import Empty +import argparse +import json +import logging +import sys +from multiprocessing import Queue, Process +from sets import Set + +import time_uuid + +import datetime +from cloghandler import ConcurrentRotatingFileHandler +import requests +import traceback +import redis +import time +from sys import platform as _platform + +import signal + +from requests.auth import HTTPBasicAuth +from usergrid import UsergridQueryIterator +import urllib3 + +__author__ = 'Jeff West @ ApigeeCorporation' + +ECID = str(uuid.uuid1()) +key_version = 'v4' + +logger = logging.getLogger('GraphMigrator') +worker_logger = logging.getLogger('Worker') +collection_worker_logger = logging.getLogger('CollectionWorker') +error_logger = logging.getLogger('ErrorLogger') +audit_logger = logging.getLogger('AuditLogger') +status_logger = logging.getLogger('StatusLogger') + +urllib3.disable_warnings() + +DEFAULT_CREATE_APPS = False +DEFAULT_RETRY_SLEEP = 10 +DEFAULT_PROCESSING_SLEEP = 1 + +queue = Queue() +QSIZE_OK = False + +try: + queue.qsize() + QSIZE_OK = True +except: + pass + +session_source = requests.Session() +session_target = requests.Session() + +cache = None + + +def total_seconds(td): + return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6 + + +def init_logging(stdout_enabled=True): + root_logger = logging.getLogger() + root_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) + + # root_logger.setLevel(logging.WARN) + + logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.ERROR) + logging.getLogger('boto').setLevel(logging.ERROR) + logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN) + + log_formatter = logging.Formatter( + fmt='%(asctime)s | ' + ECID + ' | %(name)s | %(processName)s | %(levelname)s | %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p') + + stdout_logger = logging.StreamHandler(sys.stdout) + stdout_logger.setFormatter(log_formatter) + root_logger.addHandler(stdout_logger) + + if stdout_enabled: + stdout_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) + + # base log file + + log_file_name = os.path.join(config.get('log_dir'), + '%s-%s-%s-migrator.log' % (config.get('org'), config.get('migrate'), ECID)) + + # ConcurrentRotatingFileHandler + rotating_file = ConcurrentRotatingFileHandler(filename=log_file_name, + mode='a', + maxBytes=404857600, + backupCount=0) + rotating_file.setFormatter(log_formatter) + rotating_file.setLevel(logging.INFO) + + root_logger.addHandler(rotating_file) + error_log_file_name = os.path.join(config.get('log_dir'), '%s-%s-%s-migrator-errors.log' % ( + config.get('org'), config.get('migrate'), ECID)) + + error_rotating_file = ConcurrentRotatingFileHandler(filename=error_log_file_name, + mode='a', + maxBytes=404857600, + backupCount=0) + error_rotating_file.setFormatter(log_formatter) + error_rotating_file.setLevel(logging.ERROR) + + root_logger.addHandler(error_rotating_file) + + +entity_name_map = { + 'users': 'username' +} + +config = {} + +# URL Templates for Usergrid +org_management_app_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" +org_management_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" +org_url_template = "{api_url}/{org}?client_id={client_id}&client_secret={client_secret}" +app_url_template = "{api_url}/{org}/{app}?client_id={client_id}&client_secret={client_secret}" +collection_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}" +collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&client_id={client_id}&client_secret={client_secret}&limit={limit}" +collection_graph_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}&limit={limit}" +connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}?client_id={client_id}&client_secret={client_secret}" +connecting_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/connecting/{verb}?client_id={client_id}&client_secret={client_secret}" +connection_create_by_uuid_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}?client_id={client_id}&client_secret={client_secret}" +connection_create_by_name_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_type}/{target_name}?client_id={client_id}&client_secret={client_secret}" + +connection_create_by_pairs_url_template = "{api_url}/{org}/{app}/{source_type_id}/{verb}/{target_type_id}?client_id={client_id}&client_secret={client_secret}" + +get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}&connections=none" +get_entity_url_with_connections_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" +put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" +permissions_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/permissions?client_id={client_id}&client_secret={client_secret}" + +user_credentials_url_template = "{api_url}/{org}/{app}/users/{uuid}/credentials" + +ignore_collections = ['activities', 'queues', 'events', 'notifications'] + + +class StatusListener(Process): + def __init__(self, status_queue, worker_queue): + super(StatusListener, self).__init__() + self.status_queue = status_queue + self.worker_queue = worker_queue + + def run(self): + keep_going = True + + org_results = { + 'name': config.get('org'), + 'apps': {}, + } + + empty_count = 0 + + status_file_name = os.path.join(config.get('log_dir'), + '%s-%s-%s-status.json' % (config.get('org'), config.get('migrate'), ECID)) + + while keep_going: + + try: + app, collection, status_map = self.status_queue.get(timeout=60) + status_logger.info('Received status update for app/collection: [%s / %s]' % (app, collection)) + empty_count = 0 + org_results['summary'] = { + 'max_created': -1, + 'max_modified': -1, + 'min_created': 1584946416000, + 'min_modified': 1584946416000, + 'count': 0, + 'bytes': 0 + } + + if app not in org_results['apps']: + org_results['apps'][app] = { + 'collections': {} + } + + org_results['apps'][app]['collections'].update(status_map) + + try: + for app, app_data in org_results['apps'].iteritems(): + app_data['summary'] = { + 'max_created': -1, + 'max_modified': -1, + 'min_created': 1584946416000, + 'min_modified': 1584946416000, + 'count': 0, + 'bytes': 0 + } + + if 'collections' in app_data: + for collection, collection_data in app_data['collections'].iteritems(): + + app_data['summary']['count'] += collection_data['count'] + app_data['summary']['bytes'] += collection_data['bytes'] + + org_results['summary']['count'] += collection_data['count'] + org_results['summary']['bytes'] += collection_data['bytes'] + + # APP + if collection_data.get('max_modified') > app_data['summary']['max_modified']: + app_data['summary']['max_modified'] = collection_data.get('max_modified') + + if collection_data.get('min_modified') < app_data['summary']['min_modified']: + app_data['summary']['min_modified'] = collection_data.get('min_modified') + + if collection_data.get('max_created') > app_data['summary']['max_created']: + app_data['summary']['max_created'] = collection_data.get('max_created') + + if collection_data.get('min_created') < app_data['summary']['min_created']: + app_data['summary']['min_created'] = collection_data.get('min_created') + + # ORG + if collection_data.get('max_modified') > org_results['summary']['max_modified']: + org_results['summary']['max_modified'] = collection_data.get('max_modified') + + if collection_data.get('min_modified') < org_results['summary']['min_modified']: + org_results['summary']['min_modified'] = collection_data.get('min_modified') + + if collection_data.get('max_created') > org_results['summary']['max_created']: + org_results['summary']['max_created'] = collection_data.get('max_created') + + if collection_data.get('min_created') < org_results['summary']['min_created']: + org_results['summary']['min_created'] = collection_data.get('min_created') + + if QSIZE_OK: + status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) + + status_logger.warn('UPDATED status of org processed: %s' % json.dumps(org_results)) + + try: + logger.info('Writing status to file: %s' % status_file_name) + + with open(status_file_name, 'w') as f: + json.dump(org_results, f, indent=2) + except: + print traceback.format_exc() + + except KeyboardInterrupt, e: + raise e + + except: + print traceback.format_exc() + + except KeyboardInterrupt, e: + status_logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) + raise e + + except Empty: + if QSIZE_OK: + status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) + + status_logger.warn('CURRENT status of org processed: %s' % json.dumps(org_results)) + + status_logger.warning('EMPTY! Count=%s' % empty_count) + + empty_count += 1 + + if empty_count >= 120: + keep_going = False + + except: + print traceback.format_exc() + + logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) + + try: + logger.info('Writing final status to file: %s' % status_file_name) + with open(status_file_name, 'w') as f: + json.dump(org_results, f, indent=2) + except: + print traceback.format_exc() + + +class EntityWorker(Process): + def __init__(self, queue, handler_function): + super(EntityWorker, self).__init__() + + worker_logger.debug('Creating worker!') + self.queue = queue + self.handler_function = handler_function + + def run(self): + + worker_logger.info('starting run()...') + keep_going = True + + count_processed = 0 + empty_count = 0 + start_time = int(time.time()) + + while keep_going: + + try: + # get an entity with the app and collection name + app, collection_name, entity = self.queue.get(timeout=120) + empty_count = 0 + + # if entity.get('type') == 'user': + # entity = confirm_user_entity(app, entity) + + # the handler operation is the specified operation such as migrate_graph + if self.handler_function is not None: + try: + message_start_time = int(time.time()) + processed = self.handler_function(app, collection_name, entity) + message_end_time = int(time.time()) + + if processed: + count_processed += 1 + + total_time = message_end_time - start_time + avg_time_per_message = total_time / count_processed + message_time = message_end_time - message_start_time + + worker_logger.debug('Processed [%sth] entity = %s / %s / %s' % ( + count_processed, app, collection_name, entity.get('uuid'))) + + if count_processed % 1000 == 1: + worker_logger.info( + 'Processed [%sth] entity = [%s / %s / %s] in [%s]s - avg time/message [%s]' % ( + count_processed, app, collection_name, entity.get('uuid'), message_time, + avg_time_per_message)) + + except KeyboardInterrupt, e: + raise e + + except Exception, e: + logger.exception('Error in EntityWorker processing message') + print traceback.format_exc() + + except KeyboardInterrupt, e: + raise e + + except Empty: + worker_logger.warning('EMPTY! Count=%s' % empty_count) + + empty_count += 1 + + if empty_count >= 2: + keep_going = False + + except Exception, e: + logger.exception('Error in EntityWorker run()') + print traceback.format_exc() + + +class CollectionWorker(Process): + def __init__(self, work_queue, entity_queue, response_queue): + super(CollectionWorker, self).__init__() + collection_worker_logger.debug('Creating worker!') + self.work_queue = work_queue + self.response_queue = response_queue + self.entity_queue = entity_queue + + def run(self): + + collection_worker_logger.info('starting run()...') + keep_going = True + + counter = 0 + # max_created = 0 + empty_count = 0 + app = 'ERROR' + collection_name = 'NOT SET' + status_map = {} + sleep_time = 10 + + try: + + while keep_going: + + try: + app, collection_name = self.work_queue.get(timeout=30) + + status_map = { + collection_name: { + 'iteration_started': str(datetime.datetime.now()), + 'max_created': -1, + 'max_modified': -1, + 'min_created': 1584946416000, + 'min_modified': 1584946416000, + 'count': 0, + 'bytes': 0 + } + } + + empty_count = 0 + + # added a flag for using graph vs query/index + if config.get('graph', False): + source_collection_url = collection_graph_url_template.format(org=config.get('org'), + app=app, + collection=collection_name, + limit=config.get('limit'), + **config.get('source_endpoint')) + else: + source_collection_url = collection_query_url_template.format(org=config.get('org'), + app=app, + collection=collection_name, + limit=config.get('limit'), + ql="select * %s" % config.get( + 'ql'), + **config.get('source_endpoint')) + + logger.info('Iterating URL: %s' % source_collection_url) + + # use the UsergridQuery from the Python SDK to iterate the collection + q = UsergridQueryIterator(source_collection_url, + page_delay=config.get('page_sleep_time'), + sleep_time=config.get('error_retry_sleep')) + + for entity in q: + + # begin entity loop + + self.entity_queue.put((app, collection_name, entity)) + counter += 1 + + if 'created' in entity: + + try: + entity_created = long(entity.get('created')) + + if entity_created > status_map[collection_name]['max_created']: + status_map[collection_name]['max_created'] = entity_created + status_map[collection_name]['max_created_str'] = str( + datetime.datetime.fromtimestamp(entity_created / 1000)) + + if entity_created < status_map[collection_name]['min_created']: + status_map[collection_name]['min_created'] = entity_created + status_map[collection_name]['min_created_str'] = str( + datetime.datetime.fromtimestamp(entity_created / 1000)) + + except ValueError: + pass + + if 'modified' in entity: + + try: + entity_modified = long(entity.get('modified')) + + if entity_modified > status_map[collection_name]['max_modified']: + status_map[collection_name]['max_modified'] = entity_modified + status_map[collection_name]['max_modified_str'] = str( + datetime.datetime.fromtimestamp(entity_modified / 1000)) + + if entity_modified < status_map[collection_name]['min_modified']: + status_map[collection_name]['min_modified'] = entity_modified + status_map[collection_name]['min_modified_str'] = str( + datetime.datetime.fromtimestamp(entity_modified / 1000)) + + except ValueError: + pass + + status_map[collection_name]['bytes'] += count_bytes(entity) + status_map[collection_name]['count'] += 1 + + if counter % 1000 == 1: + try: + collection_worker_logger.warning( + 'Sending stats for app/collection [%s / %s]: %s' % ( + app, collection_name, status_map)) + + self.response_queue.put((app, collection_name, status_map)) + + if QSIZE_OK: + collection_worker_logger.info( + 'Counter=%s, collection queue depth=%s' % ( + counter, self.work_queue.qsize())) + except: + pass + + collection_worker_logger.warn( + 'Current status of collections processed: %s' % json.dumps(status_map)) + + if config.get('entity_sleep_time') > 0: + collection_worker_logger.debug( + 'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time'))) + time.sleep(config.get('entity_sleep_time')) + collection_worker_logger.debug( + 'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time'))) + + # end entity loop + + status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now()) + + collection_worker_logger.warning( + 'Collection [%s / %s / %s] loop complete! Max Created entity %s' % ( + config.get('org'), app, collection_name, status_map[collection_name]['max_created'])) + + collection_worker_logger.warning( + 'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map)) + + self.response_queue.put((app, collection_name, status_map)) + + collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name)) + + except KeyboardInterrupt, e: + raise e + + except Empty: + collection_worker_logger.warning('EMPTY! Count=%s' % empty_count) + + empty_count += 1 + + if empty_count >= 2: + keep_going = False + + except Exception, e: + logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name) + print traceback.format_exc() + + finally: + self.response_queue.put((app, collection_name, status_map)) + collection_worker_logger.info('FINISHED!') + + +def use_name_for_collection(collection_name): + return collection_name in config.get('use_name_for_collection', []) + + +def include_edge(collection_name, edge_name): + include_edges = config.get('include_edge', []) + + if include_edges is None: + include_edges = [] + + exclude_edges = config.get('exclude_edge', []) + + if exclude_edges is None: + exclude_edges = [] + + if len(include_edges) > 0 and edge_name not in include_edges: + logger.debug( + 'Skipping edge [%s] since it is not in INCLUDED list: %s' % (edge_name, include_edges)) + return False + + if edge_name in exclude_edges: + logger.debug( + 'Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges)) + return False + + if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \ + or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']): + # feed and activities are not retrievable... + # roles and groups will be more efficiently handled from the role/group -> user + # followers will be handled by 'following' + # do only this from user -> device + return False + + return True + + +def exclude_edge(collection_name, edge_name): + exclude_edges = config.get('exclude_edge', []) + + if exclude_edges is None: + exclude_edges = [] + + if edge_name in exclude_edges: + logger.debug('Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges)) + return True + + if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \ + or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']): + # feed and activities are not retrievable... + # roles and groups will be more efficiently handled from the role/group -> user + # followers will be handled by 'following' + # do only this from user -> device + return True + + return False + + +def confirm_user_entity(app, source_entity, attempts=0): + attempts += 1 + + source_entity_url = get_entity_url_template.format(org=config.get('org'), + app=app, + collection='users', + uuid=source_entity.get('username'), + **config.get('source_endpoint')) + + if attempts >= 5: + logger.warning('Punting after [%s] attempts to confirm user at URL [%s], will use the source entity...' % ( + attempts, source_entity_url)) + + return source_entity + + r = requests.get(url=source_entity_url) + + if r.status_code == 200: + retrieved_entity = r.json().get('entities')[0] + + if retrieved_entity.get('uuid') != source_entity.get('uuid'): + logger.info( + 'UUID of Source Entity [%s] differs from uuid [%s] of retrieved entity at URL=[%s] and will be substituted' % ( + source_entity.get('uuid'), retrieved_entity.get('uuid'), source_entity_url)) + + return retrieved_entity + + elif 'service_resource_not_found' in r.text: + + logger.warn('Unable to retrieve user at URL [%s], and will use source entity. status=[%s] response: %s...' % ( + source_entity_url, r.status_code, r.text)) + + return source_entity + + else: + logger.error('After [%s] attempts to confirm user at URL [%s], received status [%s] message: %s...' % ( + attempts, source_entity_url, r.status_code, r.text)) + + time.sleep(DEFAULT_RETRY_SLEEP) + + return confirm_user_entity(app, source_entity, attempts) + + +def create_connection(app, collection_name, source_entity, edge_name, target_entity): + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + source_identifier = get_source_identifier(source_entity) + target_identifier = get_source_identifier(target_entity) + + source_type_id = '%s/%s' % (source_entity.get('type'), source_identifier) + target_type_id = '%s/%s' % (target_entity.get('type'), target_identifier) + + if source_entity.get('type') == 'user': + source_type_id = '%s/%s' % ('users', source_entity.get('username')) + + if target_entity.get('type') == 'user': + if edge_name == 'users': + target_type_id = target_entity.get('uuid') + else: + target_type_id = '%s/%s' % ('users', target_entity.get('uuid')) + + if target_entity.get('type') == 'device': + if edge_name == 'devices': + target_type_id = target_entity.get('uuid') + else: + target_type_id = '%s/%s' % ('devices', target_entity.get('uuid')) + + if target_entity.get('type') == 'receipt': + if edge_name == 'receipts': + target_type_id = target_entity.get('uuid') + else: + target_type_id = '%s/%s' % ('receipts', target_entity.get('uuid')) + + create_connection_url = connection_create_by_pairs_url_template.format( + org=target_org, + app=target_app, + source_type_id=source_type_id, + verb=edge_name, + target_type_id=target_type_id, + **config.get('target_endpoint')) + + if not config.get('skip_cache_read', False): + processed = cache.get(create_connection_url) + + if processed not in [None, 'None']: + logger.debug('Skipping visited Edge: [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % ( + app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'), + target_entity.get('name'), create_connection_url)) + + return True + + logger.info('Connecting entity [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % ( + app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'), + target_entity.get('name', target_entity.get('uuid')), create_connection_url)) + + attempts = 0 + + while attempts < 5: + attempts += 1 + + r_create = session_target.post(create_connection_url) + + if r_create.status_code == 200: + + if not config.get('skip_cache_write', False): + cache.set(create_connection_url, 1) + + return True + else: + if r_create.status_code >= 500: + + if attempts < 5: + logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % ( + r_create.status_code, create_connection_url, r_create.text)) + time.sleep(DEFAULT_RETRY_SLEEP) + else: + logger.critical( + 'FAILED [%s] (WILL NOT RETRY - max attempts) to create connection at URL=[%s]: %s' % ( + r_create.status_code, create_connection_url, r_create.text)) + return False + + elif r_create.status_code in [401, 404]: + + if config.get('repair_data', False): + logger.warning('FAILED [%s] (WILL attempt repair) to create connection at URL=[%s]: %s' % ( + r_create.status_code, create_connection_url, r_create.text)) + migrate_data(app, source_entity.get('type'), source_entity, force=True) + migrate_data(app, target_entity.get('type'), target_entity, force=True) + + else: + logger.critical('FAILED [%s] (WILL NOT attempt repair) to create connection at URL=[%s]: %s' % ( + r_create.status_code, create_connection_url, r_create.text)) + + else: + logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % ( + r_create.status_code, create_connection_url, r_create.text)) + + return False + + +def process_edges(app, collection_name, source_entity, edge_name, connection_stack): + + source_identifier = get_source_identifier(source_entity) + + while len(connection_stack) > 0: + + target_entity = connection_stack.pop() + + if exclude_collection(collection_name) or exclude_collection(target_entity.get('type')): + logger.debug('EXCLUDING Edge (collection): [%s / %s / %s] --[%s]--> ?' % ( + app, collection_name, source_identifier, edge_name )) + continue + + create_connection(app, collection_name, source_entity, edge_name, target_entity) + + +def migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0): + if not include_edge(collection_name, edge_name): + return True + + source_uuid = source_entity.get('uuid') + + key = '%s:edge:out:%s:%s' % (key_version, source_uuid, edge_name) + + if not config.get('skip_cache_read', False): + date_visited = cache.get(key) + + if date_visited not in [None, 'None']: + logger.info('Skipping EDGE [%s / %s --%s-->] - visited at %s' % ( + collection_name, source_uuid, edge_name, date_visited)) + return True + else: + cache.delete(key) + + if not config.get('skip_cache_write', False): + cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) + + logger.debug('Visiting EDGE [%s / %s (%s) --%s-->] at %s' % ( + collection_name, source_uuid, get_uuid_time(source_uuid), edge_name, str(datetime.datetime.utcnow()))) + + response = True + + source_identifier = get_source_identifier(source_entity) + + count_edges = 0 + + logger.debug( + 'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier)) + + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + connection_query_url = connection_query_url_template.format( + org=config.get('org'), + app=app, + verb=edge_name, + collection=collection_name, + uuid=source_identifier, + limit=config.get('limit'), + **config.get('source_endpoint')) + + connection_query = UsergridQueryIterator(connection_query_url, sleep_time=config.get('error_retry_sleep')) + + connection_stack = [] + + for target_entity in connection_query: + target_connection_collection = config.get('collection_mapping', {}).get(target_entity.get('type'), + target_entity.get('type')) + + target_ok = migrate_graph(app, target_entity.get('type'), source_entity=target_entity, depth=depth) + + if not target_ok: + logger.critical( + 'Error migrating TARGET entity data for connection [%s / %s / %s] --[%s]--> [%s / %s / %s]' % ( + app, collection_name, source_identifier, edge_name, app, target_connection_collection, + target_entity.get('name', target_entity.get('uuid')))) + + count_edges += 1 + connection_stack.append(target_entity) + + process_edges(app, collection_name, source_entity, edge_name, connection_stack) + + return response + + +def get_source_identifier(source_entity): + entity_type = source_entity.get('type') + + source_identifier = source_entity.get('uuid') + + if use_name_for_collection(entity_type): + + if entity_type in ['user']: + source_identifier = source_entity.get('username') + else: + source_identifier = source_entity.get('name') + + if source_identifier is None: + source_identifier = source_entity.get('uuid') + logger.warn('Using UUID for entity [%s / %s]' % (entity_type, source_identifier)) + + return source_identifier + + +def include_collection(collection_name): + if collection_name in ['events']: + return False + + include = config.get('collection', []) + + if include is not None and len(include) > 0 and collection_name not in include: + return False + + exclude = config.get('exclude_collection', []) + + if exclude is not None and collection_name in exclude: + return False + + return True + + +def exclude_collection(collection_name): + exclude = config.get('exclude_collection', []) + + if exclude is not None and collection_name in exclude: + return True + + return False + + +def migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0): + source_uuid = source_entity.get('uuid') + key = '%s:edges:in:%s:%s' % (key_version, source_uuid, edge_name) + + if not config.get('skip_cache_read', False): + date_visited = cache.get(key) + + if date_visited not in [None, 'None']: + logger.info('Skipping EDGE [--%s--> %s / %s] - visited at %s' % ( + collection_name, source_uuid, edge_name, date_visited)) + return True + else: + cache.delete(key) + + if not config.get('skip_cache_write', False): + cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) + + logger.debug('Visiting EDGE [--%s--> %s / %s (%s)] at %s' % ( + edge_name, collection_name, source_uuid, get_uuid_time(source_uuid), str(datetime.datetime.utcnow()))) + + source_identifier = get_source_identifier(source_entity) + + if exclude_collection(collection_name): + logger.debug('Excluding (Collection) entity [%s / %s / %s]' % (app, collection_name, source_uuid)) + return True + + if not include_edge(collection_name, edge_name): + return True + + logger.debug( + 'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier)) + + logger.debug('Processing IN edges type=[%s] of entity [ %s / %s / %s]' % ( + edge_name, app, collection_name, source_uuid)) + + connecting_query_url = connecting_query_url_template.format( + org=config.get('org'), + app=app, + collection=collection_name, + uuid=source_uuid, + verb=edge_name, + limit=config.get('limit'), + **config.get('source_endpoint')) + + connection_query = UsergridQueryIterator(connecting_query_url, sleep_time=config.get('error_retry_sleep')) + + response = True + + for e_connection in connection_query: + logger.debug('Triggering IN->OUT edge migration on entity [%s / %s / %s] ' % ( + app, e_connection.get('type'), e_connection.get('uuid'))) + + response = migrate_graph(app, e_connection.get('type'), e_connection, depth) and response + + return response + + +def migrate_graph(app, collection_name, source_entity, depth=0): + depth += 1 + source_uuid = source_entity.get('uuid') + + # short circuit if the graph depth exceeds what was specified + if depth > config.get('graph_depth', 1): + logger.debug( + 'Reached Max Graph Depth, stopping after [%s] on [%s / %s]' % (depth, collection_name, source_uuid)) + return True + else: + logger.debug('Processing @ Graph Depth [%s]' % depth) + + if exclude_collection(collection_name): + logger.warn('Ignoring entity in filtered collection [%s]' % collection_name) + return True + + key = '%s:graph:%s' % (key_version, source_uuid) + entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) + + if not config.get('skip_cache_read', False): + date_visited = cache.get(key) + + if date_visited not in [None, 'None']: + logger.debug('Skipping GRAPH %s at %s' % (entity_tag, date_visited)) + return True + else: + cache.delete(key) + + logger.info('Visiting GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow()))) + + if not config.get('skip_cache_write', False): + cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) + + # first, migrate data for current node + response = migrate_data(app, collection_name, source_entity) + + # gather the outbound edge names + out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])] + out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])] + + logger.debug('Entity %s has [%s] OUT edges' % (entity_tag, len(out_edge_names))) + + # migrate each outbound edge type + for edge_name in out_edge_names: + + if not exclude_edge(collection_name, edge_name): + response = migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth) and response + + if config.get('prune', False): + prune_edge_by_name(edge_name, app, collection_name, source_entity) + + # gather the inbound edge names + in_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('connecting', [])] + + logger.debug('Entity %s has [%s] IN edges' % (entity_tag, len(in_edge_names))) + + # migrate each inbound edge type + for edge_name in in_edge_names: + + if not exclude_edge(collection_name, edge_name): + response = migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, + depth) and response + + return response + + +def collect_entities(q): + response = {} + + for e in q: + response[e.get('uuid')] = e + + return response + + +def prune_edge_by_name(edge_name, app, collection_name, source_entity): + if not include_edge(collection_name, edge_name): + return True + + source_identifier = get_source_identifier(source_entity) + source_uuid = source_entity.get('uuid') + + entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) + + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + target_connection_query_url = connection_query_url_template.format( + org=target_org, + app=target_app, + verb=edge_name, + collection=target_collection, + uuid=source_identifier, + limit=config.get('limit'), + **config.get('target_endpoint')) + + source_connection_query_url = connection_query_url_template.format( + org=config.get('org'), + app=app, + verb=edge_name, + collection=collection_name, + uuid=source_identifier, + limit=config.get('limit'), + **config.get('source_endpoint')) + + source_connections = collect_entities( + UsergridQueryIterator(source_connection_query_url, sleep_time=config.get('error_retry_sleep'))) + + target_connections = collect_entities( + UsergridQueryIterator(target_connection_query_url, sleep_time=config.get('error_retry_sleep'))) + + delete_uuids = Set(target_connections.keys()) - Set(source_connections.keys()) + + if len(delete_uuids) > 0: + logger.info('Found [%s] edges to delete for entity %s' % (len(delete_uuids), entity_tag)) + + for delete_uuid in delete_uuids: + delete_connection_url = connection_create_by_uuid_url_template.format( + org=target_org, + app=target_app, + verb=edge_name, + collection=target_collection, + uuid=source_identifier, + target_uuid=delete_uuid, + **config.get('target_endpoint')) + + attempts = 0 + + while attempts < 5: + attempts += 1 + + r = session_target.delete(delete_connection_url) + + if not config.get('skip_cache_write'): + cache.delete(delete_connection_url) + + if r.status_code == 200: + logger.info('Pruned edge on attempt [%s] URL=[%s]' % (attempts, delete_connection_url)) + break + else: + logger.error('Error [%s] on attempt [%s] deleting connection at URL=[%s]: %s' % ( + r.status_code, attempts, delete_connection_url, r.text)) + time.sleep(DEFAULT_RETRY_SLEEP) + + return True + + +def prune_graph(app, collection_name, source_entity): + source_uuid = source_entity.get('uuid') + key = '%s:prune_graph:%s' % (key_version, source_uuid) + entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) + + if not config.get('skip_cache_read', False): + date_visited = cache.get(key) + + if date_visited not in [None, 'None']: + logger.debug('Skipping PRUNE %s at %s' % (entity_tag, date_visited)) + return True + else: + cache.delete(key) + + logger.debug('pruning GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow()))) + if not config.get('skip_cache_write', False): + cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) + + if collection_name in config.get('exclude_collection', []): + logger.debug('Excluding (Collection) entity %s' % entity_tag) + return True + + out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])] + out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])] + + for edge_name in out_edge_names: + prune_edge_by_name(edge_name, app, collection_name, source_entity) + + +def reput(app, collection_name, source_entity, attempts=0): + source_identifier = source_entity.get('uuid') + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + try: + target_entity_url_by_name = put_entity_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=source_identifier, + **config.get('target_endpoint')) + + r = session_source.put(target_entity_url_by_name, data=json.dumps({})) + if r.status_code != 200: + logger.info('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code)) + else: + logger.debug('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code)) + + except: + pass + + +def get_uuid_time(the_uuid_string): + return time_uuid.TimeUUID(the_uuid_string).get_datetime() + + +def migrate_permissions(app, collection_name, source_entity, attempts=0): + if collection_name not in ['roles', 'role', 'group', 'groups']: + return True + + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + source_identifier = get_source_identifier(source_entity) + + source_permissions_url = permissions_url_template.format(org=config.get('org'), + app=app, + collection=collection_name, + uuid=source_identifier, + **config.get('source_endpoint')) + + r = session_source.get(source_permissions_url) + + if r.status_code != 200: + logger.error('Unable to get permissions at URL [%s]: %s' % (source_permissions_url, r.text)) + return False + + perm_response = r.json() + + perms = perm_response.get('data', []) + + logger.info('Migrating [%s / %s] with permissions %s' % (collection_name, source_identifier, perms)) + + if len(perms) > 0: + target_permissions_url = permissions_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=source_identifier, + **config.get('target_endpoint')) + + for permission in perms: + data = {'permission': permission} + + logger.info('Posting permission %s to %s' % (json.dumps(data), target_permissions_url)) + + r = session_target.post(target_permissions_url, json.dumps(data)) + + if r.status_code != 200: + logger.error( + 'ERROR posting permission %s to URL=[%s]: %s' % ( + json.dumps(data), target_permissions_url, r.text)) + + return True + + +def migrate_data(app, collection_name, source_entity, attempts=0, force=False): + if config.get('skip_data') and not force: + return True + + # check the cache to see if this entity has changed + if not config.get('skip_cache_read', False) and not force: + try: + str_modified = cache.get(source_entity.get('uuid')) + + if str_modified not in [None, 'None']: + + modified = long(str_modified) + + logger.debug('FOUND CACHE: %s = %s ' % (source_entity.get('uuid'), modified)) + + if modified <= source_entity.get('modified'): + + modified_date = datetime.datetime.utcfromtimestamp(modified / 1000) + e_uuid = source_entity.get('uuid') + + uuid_datetime = time_uuid.TimeUUID(e_uuid).get_datetime() + + logger.debug('Skipping ENTITY: %s / %s / %s / %s (%s) / %s (%s)' % ( + config.get('org'), app, collection_name, e_uuid, uuid_datetime, modified, modified_date)) + return True + else: + logger.debug('DELETING CACHE: %s ' % (source_entity.get('uuid'))) + cache.delete(source_entity.get('uuid')) + except: + logger.error('Error on checking cache for uuid=[%s]' % source_entity.get('uuid')) + logger.error(traceback.format_exc()) + + if exclude_collection(collection_name): + logger.warn('Excluding entity in filtered collection [%s]' % collection_name) + return True + + # handle duplicate user case + if collection_name in ['users', 'user']: + source_entity = confirm_user_entity(app, source_entity) + + source_identifier = get_source_identifier(source_entity) + + logger.info('Visiting ENTITY data [%s / %s (%s) ] at %s' % ( + collection_name, source_identifier, get_uuid_time(source_entity.get('uuid')), str(datetime.datetime.utcnow()))) + + entity_copy = source_entity.copy() + + if 'metadata' in entity_copy: + entity_copy.pop('metadata') + + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + try: + target_entity_url_by_name = put_entity_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=source_identifier, + **config.get('target_endpoint')) + + r = session_target.put(url=target_entity_url_by_name, data=json.dumps(entity_copy)) + + if attempts > 1: + logger.warn('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % ( + attempts, collection_name, source_identifier, target_entity_url_by_name)) + else: + logger.debug('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % ( + attempts, collection_name, source_identifier, target_entity_url_by_name)) + + if r.status_code == 200: + # Worked => WE ARE DONE + logger.info( + 'migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % ( + True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'), + source_entity.get('modified'),)) + + if not config.get('skip_cache_write', False): + logger.debug('SETTING CACHE | uuid=[%s] | modified=[%s]' % ( + source_entity.get('uuid'), str(source_entity.get('modified')))) + + cache.set(source_entity.get('uuid'), str(source_entity.get('modified'))) + + if collection_name in ['role', 'group', 'roles', 'groups']: + migrate_permissions(app, collection_name, source_entity, attempts=0) + + if collection_name in ['users', 'user']: + migrate_user_credentials(app, collection_name, source_entity, attempts=0) + + return True + + else: + logger.error('Failure [%s] on attempt [%s] to PUT url=[%s], entity=[%s] response=[%s]' % ( + r.status_code, attempts, target_entity_url_by_name, json.dumps(source_entity), r.text)) + + if attempts >= 5: + logger.critical( + 'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % ( + True, attempts, source_entity.get('created'), source_entity.get('modified'), app, + collection_name, source_identifier)) + + return False + + if r.status_code == 400: + + if target_collection in ['roles', 'role']: + return repair_user_role(app, collection_name, source_entity) + + elif target_collection in ['users', 'user']: + return handle_user_migration_conflict(app, collection_name, source_entity) + + elif 'duplicate_unique_property_exists' in r.text: + logger.error( + 'WILL NOT RETRY (duplicate) [%s] attempts to PUT url=[%s], entity=[%s] response=[%s]' % ( + attempts, target_entity_url_by_name, json.dumps(source_entity), r.text)) + + return False + + elif r.status_code == 403: + logger.critical( + 'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % ( + False, attempts, source_entity.get('created'), source_entity.get('modified'), app, + collection_name, source_identifier)) + return False + + except: + logger.error(traceback.format_exc()) + logger.error('error in migrate_data on entity: %s' % json.dumps(source_entity)) + + logger.warn( + 'UNSUCCESSFUL migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % ( + True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'), + source_entity.get('modified'),)) + + return migrate_data(app, collection_name, source_entity, attempts=attempts + 1) + + +def handle_user_migration_conflict(app, collection_name, source_entity, attempts=0, depth=0): + if collection_name in ['users', 'user']: + return False + + username = source_entity.get('username') + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + target_entity_url = get_entity_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=username, + **config.get('target_endpoint')) + + # There is retry build in, here is the short circuit + if attempts >= 5: + logger.critical( + 'Aborting after [%s] attempts to audit user [%s] at URL [%s]' % (attempts, username, target_entity_url)) + + return False + + r = session_target.get(url=target_entity_url) + + if r.status_code == 200: + target_entity = r.json().get('entities')[0] + + if source_entity.get('created') < target_entity.get('created'): + return repair_user_role(app, collection_name, source_entity) + + elif r.status_code / 100 == 5: + audit_logger.warning( + 'CONFLICT: handle_user_migration_conflict failed attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % ( + attempts, r.status_code, target_entity_url, r.text)) + + time.sleep(DEFAULT_RETRY_SLEEP) + + return handle_user_migration_conflict(app, collection_name, source_entity, attempts) + + else: + audit_logger.error( + 'CONFLICT: Failed handle_user_migration_conflict attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % ( + attempts, r.status_code, target_entity_url, r.text)) + + return False + + +def get_best_source_entity(app, collection_name, source_entity, depth=0): + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + target_pk = 'uuid' + + if target_collection in ['users', 'user']: + target_pk = 'username' + elif target_collection in ['roles', 'role']: + target_pk = 'name' + + target_name = source_entity.get(target_pk) + + # there should be no target entity now, we just need to decide which one from the source to use + source_entity_url_by_name = get_entity_url_template.format(org=config.get('org'), + app=app, + collection=collection_name, + uuid=target_name, + **config.get('source_endpoint')) + + r_get_source_entity = session_source.get(source_entity_url_by_name) + + # if we are able to get at the source by PK... + if r_get_source_entity.status_code == 200: + + # extract the entity from the response + entity_from_get = r_get_source_entity.json().get('entities')[0] + + return entity_from_get + + elif r_get_source_entity.status_code / 100 == 4: + # wasn't found, get by QL and sort + source_entity_query_url = collection_query_url_template.format(org=config.get('org'), + app=app, + collection=collection_name, + ql='select * where %s=\'%s\' order by created asc' % ( + target_pk, target_name), + limit=config.get('limit'), + **config.get('source_endpoint')) + + logger.info('Attempting to determine best entity from query on URL %s' % source_entity_query_url) + + q = UsergridQueryIterator(source_entity_query_url, sleep_time=config.get('error_retry_sleep')) + + desired_entity = None + + entity_counter = 0 + + for e in q: + entity_counter += 1 + + if desired_entity is None: + desired_entity = e + + elif e.get('created') < desired_entity.get('created'): + desired_entity = e + + if desired_entity is None: + logger.warn('Unable to determine best of [%s] entities from query on URL %s' % ( + entity_counter, source_entity_query_url)) + + return source_entity + + else: + return desired_entity + + else: + return source_entity + + +def repair_user_role(app, collection_name, source_entity, attempts=0, depth=0): + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + # For the users collection, there seemed to be cases where a USERNAME was created/existing with the a + # different UUID which caused a 'collision' - so the point is to delete the entity with the differing + # UUID by UUID and then do a recursive call to migrate the data - now that the collision has been cleared + + target_pk = 'uuid' + + if target_collection in ['users', 'user']: + target_pk = 'username' + elif target_collection in ['roles', 'role']: + target_pk = 'name' + + target_name = source_entity.get(target_pk) + + target_entity_url_by_name = get_entity_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=target_name, + **config.get('target_endpoint')) + + logger.warning('Repairing: Deleting name=[%s] entity at URL=[%s]' % (target_name, target_entity_url_by_name)) + + r = session_target.delete(target_entity_url_by_name) + + if r.status_code == 200 or (r.status_code in [404, 401] and 'service_resource_not_found' in r.text): + logger.info('Deletion of entity at URL=[%s] was [%s]' % (target_entity_url_by_name, r.status_code)) + + best_source_entity = get_best_source_entity(app, collection_name, source_entity) + + target_entity_url_by_uuid = get_entity_url_template.format(org=target_org, + app=target_app, + collection=target_collection, + uuid=best_source_entity.get('uuid'), + **config.get('target_endpoint')) + + r = session_target.put(target_entity_url_by_uuid, data=json.dumps(best_source_entity)) + + if r.status_code == 200: + logger.info('Successfully repaired user at URL=[%s]' % target_entity_url_by_uuid) + return True + + else: + logger.critical('Failed to PUT [%s] the desired entity at URL=[%s]: %s' % ( + r.status_code, target_entity_url_by_name, r.text)) + return False + + else: + # log an error and keep going if we cannot delete the entity at the specified URL. Unlikely, but if so + # then this entity is borked + logger.critical( + 'Deletion of entity at URL=[%s] FAILED [%s]: %s' % (target_entity_url_by_name, r.status_code, r.text)) + return False + + +def get_target_mapping(app, collection_name): + target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) + target_app = config.get('app_mapping', {}).get(app, app) + target_collection = config.get('collection_mapping', {}).get(collection_name, collection_name) + return target_app, target_collection, target_org + + +def parse_args(): + parser = argparse.ArgumentParser(description='Usergrid Org/App Migrator') + + parser.add_argument('--log_dir', + help='path to the place where logs will be written', + default='./', + type=str, + required=False) + + parser.add_argument('--log_level', + help='log level - DEBUG, INFO, WARN, ERROR, CRITICAL', + default='INFO', + type=str, + required=False) + + parser.add_argument('-o', '--org', + help='Name of the org to migrate', + type=str, + required=True) + + parser.add_argument('-a', '--app', + help='Name of one or more apps to include, specify none to include all apps', + required=False, + action='append') + + parser.add_argument('-e', '--include_edge', + help='Name of one or more edges/connection types to INCLUDE, specify none to include all edges', + required=False, + action='append') + + parser.add_argument('--exclude_edge', + help='Name of one or more edges/connection types to EXCLUDE, specify none to include all edges', + required=False, + action='append') + + parser.add_argument('--exclude_collection', + help='Name of one or more collections to EXCLUDE, specify none to include all collections', + required=False, + action='append') + + parser.add_argument('-c', '--collection', + help='Name of one or more collections to include, specify none to include all collections', + default=[], + action='append') + + parser.add_argument('--force_app', + help='Necessary for using 2.0 as a source at times due to API issues. Forces the specified app(s) to be processed, even if they are not returned from the list of apps in the API call', + default=[], + action='append') + + parser.add_argument('--use_name_for_collection', + help='Name of one or more collections to use [name] instead of [uuid] for creating entities and edges', + default=[], + action='append') + + parser.add_argument('-m', '--migrate', + help='Specifies what to migrate: data, connections, credentials, audit or none (just iterate ' + 'the apps/collections)', + type=str, + choices=[ + 'data', + 'prune', + 'none', + 'reput', + 'credentials', + 'graph', + 'permissions' + ], + default='data') + + parser.add_argument('-s', '--source_config', + help='The path to the source endpoint/org configuration file', + type=str, + default='source.json') + + parser.add_argument('-d', '--target_config', + help='The path to the target endpoint/org configuration file', + type=str, + default='destination.json') + + parser.add_argument('--redis_socket', + help='The path to the socket for redis to use', + type=str) + + parser.add_argument('--limit', + help='The number of entities to return per query request', + type=int, + default=100) + + parser.add_argument('-w', '--entity_workers', + help='The number of worker processes to do the migration', + type=int, + default=16) + + parser.add_argument('--visit_cache_ttl', + help='The TTL of the cache of visiting nodes in the graph for connections', + type=int, + default=3600 * 2) + + parser.add_argument('--error_retry_sleep', + help='The number of seconds to wait between retrieving after an error', + type=float, + default=30) + + parser.add_argument('--page_sleep_time', + help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', + type=float, + default=0) + + parser.add_argument('--entity_sleep_time', + help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', + type=float, + default=0) + + parser.add_argument('--collection_workers', + help='The number of worker processes to do the migration', + type=int, + default=2) + + parser.add_argument('--queue_size_max', + help='The max size of entities to allow in the queue', + type=int, + default=100000) + + parser.add_argument('--graph_depth', + help='The graph depth to traverse to copy', + type=int, + default=3) + + parser.add_argument('--queue_watermark_high', + help='The point at which publishing to the queue will PAUSE until it is at or below low watermark', + type=int, + default=25000) + + parser.add_argument('--min_modified', + help='Break when encountering a modified date before this, per collection', + type=int, + default=0) + + parser.add_argument('--max_modified', + help='Break when encountering a modified date after this, per collection', + type=long, + default=3793805526000) + + parser.add_argument('--queue_watermark_low', + help='The point at which publishing to the queue will RESUME after it has reached the high watermark', + type=int, + default=5000) + + parser.add_argument('--ql', + help='The QL to use in the filter for reading data from collections', + type=str, + default='select * order by created asc') + # default='select * order by created asc') + + parser.add_argument('--repair_data', + help='Repair data when iterating/migrating graph but skipping data', + action='store_true') + + parser.add_argument('--prune', + help='Prune the graph while processing (instead of the prune operation)', + action='store_true') + + parser.add_argument('--skip_data', + help='Skip migrating data (useful for connections only)', + action='store_true') + + parser.add_argument('--skip_credentials', + help='Skip migrating credentials', + action='store_true') + + parser.add_argument('--skip_cache_read', + help='Skip reading the cache (modified timestamps and graph edges)', + dest='skip_cache_read', + action='store_true') + + parser.add_argument('--skip_cache_write', + help='Skip updating the cache with modified timestamps of entities and graph edges', + dest='skip_cache_write', + action='store_true') + + parser.add_argument('--create_apps', + help='Create apps at the target if they do not exist', + dest='create_apps', + action='store_true') + + parser.add_argument('--nohup', + help='specifies not to use stdout for logging', + action='store_true') + + parser.add_argument('--graph', + help='Use GRAPH instead of Query', + dest='graph', + action='store_true') + + parser.add_argument('--su_username', + help='Superuser username', + required=False, + type=str) + + parser.add_argument('--su_password', + help='Superuser Password', + required=False, + type=str) + + parser.add_argument('--inbound_connections', + help='Name of the org to migrate', + action='store_true') + + parser.add_argument('--map_app', + help="Multiple allowed: A colon-separated string such as 'apples:oranges' which indicates to" + " put data from the app named 'apples' from the source endpoint into app named 'oranges' " + "in the target endpoint", + default=[], + action='append') + + parser.add_argument('--map_collection', + help="One or more colon-separated string such as 'cats:dogs' which indicates to put data from " + "collections named 'cats' from the source endpoint into a collection named 'dogs' in the " + "target endpoint, applicable globally to all apps", + default=[], + action='append') + + parser.add_argument('--map_org', + help="One or more colon-separated strings such as 'red:blue' which indicates to put data from " + "org named 'red' from the source endpoint into a collection named 'blue' in the target " + "endpoint", + default=[], + action='append') + + my_args = parser.parse_args(sys.argv[1:]) + + return vars(my_args) + + +def init(): + global config + + if config.get('migrate') == 'credentials': + + if config.get('su_password') is None or config.get('su_username') is None: + message = 'ABORT: In order to migrate credentials, Superuser parameters (su_password, su_username) are required' + print message + logger.critical(message) + exit() + + config['collection_mapping'] = {} + config['app_mapping'] = {} + config['org_mapping'] = {} + + for mapping in config.get('map_collection', []): + parts = mapping.split(':') + + if len(parts) == 2: + config['collection_mapping'][parts[0]] = parts[1] + else: + logger.warning('Skipping Collection mapping: [%s]' % mapping) + + for mapping in config.get('map_app', []): + parts = mapping.split(':') + + if len(parts) == 2: + config['app_mapping'][parts[0]] = parts[1] + else: + logger.warning('Skipping App mapping: [%s]' % mapping) + + for mapping in config.get('map_org', []): + parts = mapping.split(':') + + if len(parts) == 2: + config['org_mapping'][parts[0]] = parts[1] + logger.info('Mapping Org [%s] to [%s] from mapping [%s]' % (parts[0], parts[1], mapping)) + else: + logger.warning('Skipping Org mapping: [%s]' % mapping) + + with open(config.get('source_config'), 'r') as f: + config['source_config'] = json.load(f) + + with open(config.get('target_config'), 'r') as f: + config['target_config'] = json.load(f) + + if config['exclude_collection'] is None: + config['exclude_collection'] = [] + + config['source_endpoint'] = config['source_config'].get('endpoint').copy() + config['source_endpoint'].update(config['source_config']['credentials'][config['org']]) + + target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) + + config['target_endpoint'] = config['target_config'].get('endpoint').copy() + config['target_endpoint'].update(config['target_config']['credentials'][target_org]) + + +def wait_for(threads, label, sleep_time=60): + wait = True + + logger.info('Starting to wait for [%s] threads with sleep time=[%s]' % (len(threads), sleep_time)) + + while wait: + wait = False + alive_count = 0 + + for t in threads: + + if t.is_alive(): + alive_count += 1 + logger.info('Thread [%s] is still alive' % t.name) + + if alive_count > 0: + wait = True + logger.info('Continuing to wait for [%s] threads with sleep time=[%s]' % (alive_count, sleep_time)) + time.sleep(sleep_time) + + logger.warn('All workers [%s] done!' % label) + + +def count_bytes(entity): + entity_copy = entity.copy() + + if 'metadata' in entity_copy: + del entity_copy['metadata'] + + entity_str = json.dumps(entity_copy) + + return len(entity_str) + + +def migrate_user_credentials(app, collection_name, source_entity, attempts=0): + # this only applies to users + if collection_name not in ['users', 'user'] \ + or config.get('skip_credentials', False): + return False + + source_identifier = get_source_identifier(source_entity) + + target_app, target_collection, target_org = get_target_mapping(app, collection_name) + + # get the URLs for the source and target users + + source_url = user_credentials_url_template.format(org=config.get('org'), + app=app, + uuid=source_identifier, + **config.get('source_endpoint')) + + target_url = user_credentials_url_template.format(org=target_org, + app=target_app, + uuid=source_identifier, + **config.get('target_endpoint')) + + # this endpoint for some reason uses basic auth... + r = requests.get(source_url, auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password'))) + + if r.status_code != 200: + logger.error('Unable to migrate credentials due to HTTP [%s] on GET URL [%s]: %s' % ( + r.status_code, source_url, r.text)) + + return False + + source_credentials = r.json() + + logger.info('Putting credentials to [%s]...' % target_url) + + r = requests.put(target_url, + data=json.dumps(source_credentials), + auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password'))) + + if r.status_code != 200: + logger.error( + 'Unable to migrate credentials due to HTTP [%s] on PUT URL [%s]: %s' % ( + r.status_code, target_url, r.text)) + return False + + logger.info('migrate_user_credentials | success=[%s] | app/collection/name = %s/%s/%s' % ( + True, app, collection_name, source_entity.get('uuid'))) + + return True + + +def check_response_status(r, url, exit_on_error=True): + if r.status_code != 200: + logger.critical('HTTP [%s] on URL=[%s]' % (r.status_code, url)) + logger.critical('Response: %s' % r.text) + + if exit_on_error: + exit() + + +def do_operation(apps_and_collections, operation): + status_map = {} + + logger.info('Creating queues...') + + # Mac, for example, does not support the max_size for a queue in Python + if _platform == "linux" or _platform == "linux2": + entity_queue = Queue(maxsize=config.get('queue_size_max')) + collection_queue = Queue(maxsize=config.get('queue_size_max')) + collection_response_queue = Queue(maxsize=config.get('queue_size_max')) + else: + entity_queue = Queue() + collection_queue = Queue() + collection_response_queue = Queue() + + logger.info('Starting entity_workers...') + + collection_count = 0 + # create the entity workers, but only start them (later) if there is work to do + entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))] + + # create the collection workers, but only start them (later) if there is work to do + collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in + xrange(config.get('collection_workers'))] + + status_listener = StatusListener(collection_response_queue, entity_queue) + + try: + # for each app, publish the (app_name, collection_name) to the queue. + # this is received by a collection worker who iterates the collection and publishes + # entities into a queue. These are received by an individual entity worker which + # executes the specified operation on the entity + + for app, app_data in apps_and_collections.get('apps', {}).iteritems(): + logger.info('Processing app=[%s]' % app) + + status_map[app] = { + 'iteration_started': str(datetime.datetime.now()), + 'max_created': -1, + 'max_modified': -1, + 'min_created': 1584946416000, + 'min_modified': 1584946416000, + 'count': 0, + 'bytes': 0, + 'collections': {} + } + + # iterate the collections which are returned. + for collection_name in app_data.get('collections'): + logger.info('Publishing app / collection: %s / %s' % (app, collection_name)) + + collection_count += 1 + collection_queue.put((app, collection_name)) + + logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app)) + + # only start the threads if there is work to do + if collection_count > 0: + status_listener.start() + + # start the worker processes which will iterate the collections + [w.start() for w in collection_workers] + + # start the worker processes which will do the work of migrating + [w.start() for w in entity_workers] + + # allow collection workers to finish + wait_for(collection_workers, label='collection_workers', sleep_time=60) + + # allow entity workers to finish + wait_for(entity_workers, label='entity_workers', sleep_time=60) + + status_listener.terminate() + + except KeyboardInterrupt: + logger.warning('Keyboard Interrupt, aborting...') + entity_queue.close() + collection_queue.close() + collection_response_queue.close() + + [os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers] + [os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers] + os.kill(super(StatusListener, status_listener).pid, signal.SIGINT) + + [w.terminate() for w in entity_workers] + [w.terminate() for w in collection_workers] + status_listener.terminate() + + logger.info('entity_workers DONE!') + + +def filter_apps_and_collections(org_apps): + app_collecitons = { + 'apps': { + + } + } + + try: + selected_apps = config.get('app') + + # iterate the apps retrieved from the org + for org_app in sorted(org_apps.keys()): + logger.info('Found SOURCE App: %s' % org_app) + + time.sleep(3) + + for org_app in sorted(org_apps.keys()): + parts = org_app.split('/') + app = parts[1] + + # if apps are specified and the current app is not in the list, skip it + if selected_apps and len(selected_apps) > 0 and app not in selected_apps: + logger.warning('Skipping app [%s] not included in process list [%s]' % (app, selected_apps)) + continue + + app_collecitons['apps'][app] = { + 'collections': [] + } + + # get the list of collections from the source org/app + source_app_url = app_url_template.format(org=config.get('org'), + app=app, + **config.get('source_endpoint')) + logger.info('GET %s' % source_app_url) + + r_collections = session_source.get(source_app_url) + + collection_attempts = 0 + + # sometimes this call was not working so I put it in a loop to force it... + while r_collections.status_code != 200 and collection_attempts < 5: + collection_attempts += 1 + logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code, + source_app_url)) + time.sleep(DEFAULT_RETRY_SLEEP) + r_collections = session_source.get(source_app_url) + + if collection_attempts >= 5: + logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url) + continue + + app_response = r_collections.json() + + logger.info('App Response: ' + json.dumps(app_response)) + + app_entities = app_response.get('entities', []) + + if len(app_entities) > 0: + app_entity = app_entities[0] + collections = app_entity.get('metadata', {}).get('collections', {}) + logger.info('App=[%s] starting Collections=[%s]' % (app, collections)) + + app_collecitons['apps'][app]['collections'] = [c for c in collections if include_collection(c)] + logger.info('App=[%s] filtered Collections=[%s]' % (app, collections)) + + except: + print traceback.format_exc() + + return app_collecitons + + +def confirm_target_org_apps(apps_and_collections): + for app in apps_and_collections.get('apps'): + + # it is possible to map source orgs and apps to differently named targets. This gets the + # target names for each + target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) + target_app = config.get('app_mapping', {}).get(app, app) + + # Check that the target Org/App exists. If not, move on to the next + target_app_url = app_url_template.format(org=target_org, + app=target_app, + **config.get('target_endpoint')) + logger.info('GET %s' % target_app_url) + r_target_apps = session_target.get(target_app_url) + + if r_target_apps.status_code != 200: + + if config.get('create_apps', DEFAULT_CREATE_APPS): + create_app_url = org_management_app_url_template.format(org=target_org, + app=target_app, + **config.get('target_endpoint')) + app_request = {'name': target_app} + r = session_target.post(create_app_url, data=json.dumps(app_request)) + + if r.status_code != 200: + logger.critical('--create_apps specified and unable to create app [%s] at URL=[%s]: %s' % ( + target_app, create_app_url, r.text)) + logger.critical('Process will now exit') + exit() + else: + logger.warning('Created app=[%s] at URL=[%s]: %s' % (target_app, create_app_url, r.text)) + else: + logger.critical('Target application DOES NOT EXIST at [%s] URL=%s' % ( + r_target_apps.status_code, target_app_url)) + continue + + +def main(): + global config, cache + + config = parse_args() + init() + init_logging() + + logger.warn('Script starting') + + try: + if config.get('redis_socket') is not None: + cache = redis.Redis(unix_socket_path=config.get('redis_socket')) + + else: + # this does not try to connect to redis + cache = redis.StrictRedis(host='localhost', port=6379, db=0) + + # this is necessary to test the connection to redis + cache.get('usergrid') + + except: + logger.error( + 'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...') + logger.error( + 'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...') + + time.sleep(3) + + config['use_cache'] = False + config['skip_cache_read'] = True + config['skip_cache_write'] = True + + org_apps = { + } + + force_apps = config.get('force_app', []) + + if force_apps is not None and len(force_apps) > 0: + logger.warn('Forcing only the following apps to be processed: %s' % force_apps) + + for app in force_apps: + ke
<TRUNCATED>