Joal has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/362396 )
Change subject: Upgrade script dropping druid deep-storage data ...................................................................... Upgrade script dropping druid deep-storage data Script now checks the dropping-data-task status and fails if not successful. Bug: T168614 Change-Id: Ibccbebe484fe28b1faa25b08cfba6d5a73c0e12c --- M bin/refinery-drop-druid-deep-storage-data 1 file changed, 160 insertions(+), 77 deletions(-) Approvals: Nuria: Verified; Looks good to me, approved diff --git a/bin/refinery-drop-druid-deep-storage-data b/bin/refinery-drop-druid-deep-storage-data index 5477fcb..c40e983 100644 --- a/bin/refinery-drop-druid-deep-storage-data +++ b/bin/refinery-drop-druid-deep-storage-data @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Note: You should make sure to put refinery/python on your PYTHONPATH. -# export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python - """ Automatically deletes the old druid deep-storage data from HDFS. @@ -24,104 +21,190 @@ Options: -h --help Show this help message and exit. -d --older-than-days=<days> Drop data older than this number of days. [default: 60] - -b --beginning-of-time=<day> Day from which we drop data. [default: 2001-01-01] - -s --druid-host=<host> Druid host to request [default: druid1001.eqiad.wmnet] - -c --druid-coord-port=<port> Port for druid coordinator [default: 8081] - -o --druid-overlord-port=<port> Port for druid overlord [default: 8090] -v --verbose Turn on verbose debug logging. -n --dry-run Don't actually delete any data. Just log the http request sent to druid + --beginning-of-time=<day> Day from which we drop data. [default: 2001-01-01] + --druid-host=<host> Druid host to request [default: druid1001.eqiad.wmnet] + --druid-coord-port=<port> Port for druid coordinator [default: 8081] + --druid-overlord-port=<port> Port for druid overlord [default: 8090] + --no-datasource-check Turn off datasource validity check in Druid (useful for + deleting data of a disabled datasource) + --sleep-between-status-checks =<s> Sleeping time in second between task status updates. + [default: 5] + """ __author__ = 'Joseph Allemandou <j...@wikimedia.org>' import datetime +import time from docopt import docopt import logging import requests from requests import RequestException -import re -import os import sys + +class DruidDataDropper(object): + + _STATUS_SUCCESS = 'SUCCESS' + _STATUS_FAILED = 'FAILED' + _STATUS_RUNNING = 'RUNNING' + + _datasource_url_pattern = 'http://{0}:{1}/druid/coordinator/v1/metadata/datasources/{2}' + _drop_url_pattern = 'http://{0}:{1}/druid/indexer/v1/task' + _drop_payload_pattern = '{{"type":"kill","id":"{0}","dataSource":"{1}","interval":"{2}/{3}"}}' + _task_status_url = 'http://{0}:{1}/druid/indexer/v1/task/{2}/status' + _day_pattern = '%Y-%m-%d' + + def __init__(self, datasource, druid_host, druid_coord_port, + druid_overlord_port, older_than_days, beginning_of_time, + datasource_check, sleep_between_status_checks, dry_run): + + # Save reused parameters + self.datasource = datasource + self.datasource_check = datasource_check + self.sleep_between_status_checks = sleep_between_status_checks + self.dry_run = dry_run + + # reused constants + now = datetime.datetime.utcnow() + dt_day_postfix = 'T00:00:00.000Z' + + # Drop interval start and end + drop_start_day = datetime.datetime.strftime(beginning_of_time, self._day_pattern) + itv_start = drop_start_day + dt_day_postfix + oldest_accepted_datetime = now - datetime.timedelta(days=older_than_days) + oldest_accepted_day = datetime.datetime.strftime(oldest_accepted_datetime, self._day_pattern) + itv_end = oldest_accepted_day + dt_day_postfix + + # Prepare task_id, check_datasrouce_url, drop_url and payload, status_url + self.datasource_url = self._datasource_url_pattern.format( + druid_host, druid_coord_port, self.datasource) + self.task_id = 'drop_{0}_{1}'.format(self.datasource, now.strftime('%Y-%m-%dT%H:%M:%S')) + self.drop_url = self._drop_url_pattern.format(druid_host, druid_overlord_port) + self.drop_payload = self._drop_payload_pattern.format(self.task_id, self.datasource, itv_start, itv_end) + self.status_url = self._task_status_url.format(druid_host, druid_overlord_port, self.task_id) + + def _check_datasource(self): + logging.debug('Checking that datasource {0} is enabled in druid.'.format(self.datasource)) + try: + if self.dry_run: + logging.info('Datasource check - get URL: {0}'.format(self.datasource_url)) + else: + check_req = requests.get(self.datasource_url) + if check_req.status_code != requests.codes.ok: + logging.error('Couldn\'t get datasource metadata for datasource {0}. Aborting.'.format(self.datasource)) + return False + else: + logging.debug('Datasource {0} is enabled in druid.'.format(self.datasource)) + return True + except RequestException: + logging.exception('Error trying to check druid datasource {0}. Aborting.'.format(self.datasource)) + return False + + def _launch_task(self): + logging.debug('Launching druid kill task {0}.'.format(self.task_id)) + if self.dry_run: + logging.info('Kill task: post URL: {0}'.format(self.drop_url)) + logging.info('Kill task: post payload: {0}'.format(self.drop_payload)) + else: + try: + headers = {'Content-type': 'application/json'} + drop_req = requests.post(self.drop_url, data=self.drop_payload, headers=headers) + + if (drop_req.status_code == requests.codes.ok + and drop_req.json()['task'] == self.task_id): + logging.debug('Kill task {0} successfully launched'.format(self.task_id)) + return True + else: + logging.error('Error launching kill task {0}. Aborting.'.format(self.task_id)) + return False + except RequestException: + logging.exception('Error trying to launch kill task {0}. Aborting.'.format(self.task_id)) + return False + + def execute(self): + # Only check datasource if requested + if (self.datasource_check and not self._check_datasource()): + return False + # Don't check task status in dry-run mode + if self._launch_task() and not self.dry_run: + task_status = self._STATUS_RUNNING + while task_status == self._STATUS_RUNNING: + time.sleep(self.sleep_between_status_checks) + try: + status_req = requests.get(self.status_url) + if (status_req.status_code == requests.codes.ok): + task_status = status_req.json()['status']['status'].strip() + logging.debug('Got new task {0} status: {1}'.format(self.task_id, task_status)) + else: + logging.error('Error getting task {0} status. Aborting.'.format(self.task_id)) + return False + except RequestException: + logging.exception('Error trying to get task {0} status. Aborting.'.format(self.task_id)) + return False + if task_status == self._STATUS_SUCCESS: + logging.debug('Kill task {0} succeeded. Gracefully finish.'.format(self.task_id)) + return True + elif task_status == self._STATUS_FAILED: + logging.error('Kill task {0} failed. Aborting.'.format(self.task_id)) + else: + logging.error('Unexpected status for task {0}: {1}. Aborting.'.format(self.task_id, task_status)) + return False + + +def parse_int_arg(arguments, argument_name): + try: + return int(arguments[argument_name]) + except ValueError: + logging.exception('Incorrect format for {0}. Integer expected. Aborting.'.format(argument_name)) + return None + if __name__ == '__main__': - check_datasource_url_pattern = 'http://{0}:{1}/druid/coordinator/v1/metadata/datasources/{2}' - drop_url_pattern = 'http://{0}:{1}/druid/indexer/v1/task' - drop_payload_pattern = '{{"type":"kill","id":"{0}","dataSource":"{1}","interval":"{2}/{3}"}}' - day_pattern = '%Y-%m-%d' + # parse arguments + arguments = docopt(__doc__) - # Parse arguments - arguments = docopt(__doc__) - datasource = arguments['<datasource>'] - host = arguments['--druid-host'] - coord_port = int(arguments['--druid-coord-port']) - overlord_port = int(arguments['--druid-overlord-port']) - days = int(arguments['--older-than-days']) - drop_start = arguments['--beginning-of-time'] - verbose = arguments['--verbose'] - dry_run = arguments['--dry-run'] - + # Configure log level log_level = logging.INFO - if verbose: + if arguments['--verbose']: log_level = logging.DEBUG logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)-6s %(message)s', datefmt='%Y-%m-%dT%H:%M:%S') - # Check for datasource segments - check_datasource_url = check_datasource_url_pattern.format(host, coord_port, datasource) + # Check arguments validity try: - check_req = requests.get(check_datasource_url) - if check_req.status_code != requests.codes.ok: - logging.error('Couldn\'t get datasource metadata for datasource {0}. Aborting.'.format(datasource)) - sys.exit(1) - else: - logging.info('Datasource {0} exists in druid.'.format(datasource)) - except RequestException: - logging.exception('Error trying to check druid datasource. Aborting.') + validated_beginning_of_time = datetime.datetime.strptime(arguments['--beginning-of-time'], '%Y-%m-%d') + except ValueError: + logging.exception('Incorrect format for --beginning-of-time parameter. YYYY-MM-DD expected. Aborting.') + validated_beginning_of_time = None + validated_druid_coord_port = parse_int_arg(arguments, '--druid-coord-port') + validated_druid_overlord_port = parse_int_arg(arguments, '--druid-overlord-port') + validated_older_than_days = parse_int_arg(arguments, '--older-than-days') + validated_sleep_between_status_checks = parse_int_arg(arguments, '--sleep-between-status-checks') + + # In case of any argument error, abort + if not (validated_beginning_of_time + and validated_druid_coord_port + and validated_druid_overlord_port + and validated_older_than_days + and validated_sleep_between_status_checks): sys.exit(1) - # reused constants - now = datetime.datetime.utcnow() - dt_day_postfix = 'T00:00:00.000Z' + # Create data dropper and execute + dataDropper = DruidDataDropper( + arguments['<datasource>'], + arguments['--druid-host'], + validated_druid_coord_port, + validated_druid_overlord_port, + validated_older_than_days, + validated_beginning_of_time, + not arguments['--no-datasource-check'], + validated_sleep_between_status_checks, + arguments['--dry-run']) - # Drop task id: drop_{datasource}_{now} - task_id = 'drop_{0}_{1}'.format(datasource, now.strftime('%Y-%m-%dT%H:%M:%S')) - logging.debug('Computed task_id: {0}'.format(task_id)) - - # Enforce valid start day by parsing it, and make itv_start - drop_start_datetime = datetime.datetime.strptime(drop_start, day_pattern) - drop_start_day = datetime.datetime.strftime(drop_start_datetime, day_pattern) - itv_start = drop_start_day + dt_day_postfix - logging.debug('Computed itv_start: {0}'.format(itv_start)) - - # Compute itc_end from days - oldest_accepted_datetime = now - datetime.timedelta(days=days) - oldest_accepted_day = datetime.datetime.strftime(oldest_accepted_datetime, day_pattern) - itv_end = oldest_accepted_day + dt_day_postfix - logging.debug('Computed itv_end: {0}'.format(itv_end)) - - # Prepare drop url and payload - drop_url = drop_url_pattern.format(host, overlord_port) - logging.debug('Computed drop_url: {0}'.format(drop_url)) - drop_payload = drop_payload_pattern.format(task_id, datasource, itv_start, itv_end) - logging.debug('Computed drop_payload: {0}'.format(drop_payload)) - - if dry_run: - logging.info('URL to request: {0}'.format(drop_url)) - logging.info('Payload to post: {0}'.format(drop_payload)) - else: - try: - headers = {'Content-type': 'application/json'} - drop_req = requests.post(drop_url, data=drop_payload, headers=headers) - logging.debug('Drop request sent.') - if drop_req.status_code == requests.codes.ok: - received_task_id = drop_req.json()['task'] - logging.debug('Drop task successfully sent with id {0}'.format(received_task_id)) - else: - logging.error('Drop task error for id {0}'.format(task_id)) - sys.exit(1) - except RequestException: - logging.exception('Error trying to drop druid data. Aborting.') - sys.exit(1) + return_code = 0 if dataDropper.execute() else 1 + sys.exit(return_code) -- To view, visit https://gerrit.wikimedia.org/r/362396 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ibccbebe484fe28b1faa25b08cfba6d5a73c0e12c Gerrit-PatchSet: 2 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org> Gerrit-Reviewer: Joal <j...@wikimedia.org> Gerrit-Reviewer: Mforns <mfo...@wikimedia.org> Gerrit-Reviewer: Nuria <nu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits