Joal has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/362396 )

Change subject: Upgrade script dropping druid deep-storage data
......................................................................


Upgrade script dropping druid deep-storage data

Script now checks the dropping-data-task status and
fails if not successful.

Bug: T168614
Change-Id: Ibccbebe484fe28b1faa25b08cfba6d5a73c0e12c
---
M bin/refinery-drop-druid-deep-storage-data
1 file changed, 160 insertions(+), 77 deletions(-)

Approvals:
  Nuria: Verified; Looks good to me, approved



diff --git a/bin/refinery-drop-druid-deep-storage-data 
b/bin/refinery-drop-druid-deep-storage-data
index 5477fcb..c40e983 100644
--- a/bin/refinery-drop-druid-deep-storage-data
+++ b/bin/refinery-drop-druid-deep-storage-data
@@ -13,9 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# Note: You should make sure to put refinery/python on your PYTHONPATH.
-#   export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python
-
 """
 Automatically deletes the old druid deep-storage data from HDFS.
 
@@ -24,104 +21,190 @@
 Options:
     -h --help                           Show this help message and exit.
     -d --older-than-days=<days>         Drop data older than this number of 
days.  [default: 60]
-    -b --beginning-of-time=<day>        Day from which we drop data. [default: 
2001-01-01]
-    -s --druid-host=<host>              Druid host to request [default: 
druid1001.eqiad.wmnet]
-    -c --druid-coord-port=<port>        Port for druid coordinator [default: 
8081]
-    -o --druid-overlord-port=<port>     Port for druid overlord [default: 8090]
     -v --verbose                        Turn on verbose debug logging.
     -n --dry-run                        Don't actually delete any data. Just 
log the http request
                                         sent to druid
+    --beginning-of-time=<day>           Day from which we drop data. [default: 
2001-01-01]
+    --druid-host=<host>                 Druid host to request [default: 
druid1001.eqiad.wmnet]
+    --druid-coord-port=<port>           Port for druid coordinator [default: 
8081]
+    --druid-overlord-port=<port>        Port for druid overlord [default: 8090]
+    --no-datasource-check               Turn off datasource validity check in 
Druid (useful for
+                                        deleting data of a disabled datasource)
+    --sleep-between-status-checks =<s>  Sleeping time in second between task 
status updates.
+                                        [default: 5]
+
 """
 __author__ = 'Joseph Allemandou <j...@wikimedia.org>'
 
 import datetime
+import time
 from   docopt   import docopt
 import logging
 import requests
 from requests import RequestException
-import re
-import os
 import sys
+
+class DruidDataDropper(object):
+
+    _STATUS_SUCCESS = 'SUCCESS'
+    _STATUS_FAILED = 'FAILED'
+    _STATUS_RUNNING = 'RUNNING'
+
+    _datasource_url_pattern = 
'http://{0}:{1}/druid/coordinator/v1/metadata/datasources/{2}'
+    _drop_url_pattern = 'http://{0}:{1}/druid/indexer/v1/task'
+    _drop_payload_pattern = 
'{{"type":"kill","id":"{0}","dataSource":"{1}","interval":"{2}/{3}"}}'
+    _task_status_url = 'http://{0}:{1}/druid/indexer/v1/task/{2}/status'
+    _day_pattern = '%Y-%m-%d'
+
+    def __init__(self, datasource, druid_host, druid_coord_port,
+                 druid_overlord_port, older_than_days, beginning_of_time,
+                 datasource_check, sleep_between_status_checks, dry_run):
+
+        # Save reused parameters
+        self.datasource = datasource
+        self.datasource_check = datasource_check
+        self.sleep_between_status_checks = sleep_between_status_checks
+        self.dry_run = dry_run
+
+        # reused constants
+        now = datetime.datetime.utcnow()
+        dt_day_postfix = 'T00:00:00.000Z'
+
+        # Drop interval start and end
+        drop_start_day = datetime.datetime.strftime(beginning_of_time, 
self._day_pattern)
+        itv_start = drop_start_day + dt_day_postfix
+        oldest_accepted_datetime = now - 
datetime.timedelta(days=older_than_days)
+        oldest_accepted_day = 
datetime.datetime.strftime(oldest_accepted_datetime, self._day_pattern)
+        itv_end = oldest_accepted_day + dt_day_postfix
+
+        # Prepare task_id, check_datasrouce_url, drop_url and payload, 
status_url
+        self.datasource_url = self._datasource_url_pattern.format(
+            druid_host, druid_coord_port, self.datasource)
+        self.task_id = 'drop_{0}_{1}'.format(self.datasource, 
now.strftime('%Y-%m-%dT%H:%M:%S'))
+        self.drop_url = self._drop_url_pattern.format(druid_host, 
druid_overlord_port)
+        self.drop_payload = self._drop_payload_pattern.format(self.task_id, 
self.datasource, itv_start, itv_end)
+        self.status_url = self._task_status_url.format(druid_host, 
druid_overlord_port, self.task_id)
+
+    def _check_datasource(self):
+        logging.debug('Checking that datasource {0} is enabled in 
druid.'.format(self.datasource))
+        try:
+            if self.dry_run:
+                logging.info('Datasource check - get URL: 
{0}'.format(self.datasource_url))
+            else:
+                check_req = requests.get(self.datasource_url)
+                if check_req.status_code != requests.codes.ok:
+                    logging.error('Couldn\'t get datasource metadata for 
datasource {0}.  Aborting.'.format(self.datasource))
+                    return False
+                else:
+                    logging.debug('Datasource {0} is enabled in 
druid.'.format(self.datasource))
+            return True
+        except RequestException:
+            logging.exception('Error trying to check druid datasource {0}. 
Aborting.'.format(self.datasource))
+            return False
+
+    def _launch_task(self):
+        logging.debug('Launching druid kill task {0}.'.format(self.task_id))
+        if self.dry_run:
+            logging.info('Kill task: post URL: {0}'.format(self.drop_url))
+            logging.info('Kill task: post payload: 
{0}'.format(self.drop_payload))
+        else:
+            try:
+                headers = {'Content-type': 'application/json'}
+                drop_req = requests.post(self.drop_url, 
data=self.drop_payload, headers=headers)
+
+                if (drop_req.status_code == requests.codes.ok
+                    and drop_req.json()['task'] == self.task_id):
+                    logging.debug('Kill task {0} successfully 
launched'.format(self.task_id))
+                    return True
+                else:
+                    logging.error('Error launching kill task {0}. 
Aborting.'.format(self.task_id))
+                    return False
+            except RequestException:
+                logging.exception('Error trying to launch kill task {0}. 
Aborting.'.format(self.task_id))
+                return False
+
+    def execute(self):
+        # Only check datasource if requested
+        if (self.datasource_check and not self._check_datasource()):
+            return False
+        # Don't check task status in dry-run mode
+        if self._launch_task() and not self.dry_run:
+            task_status = self._STATUS_RUNNING
+            while task_status == self._STATUS_RUNNING:
+                time.sleep(self.sleep_between_status_checks)
+                try:
+                    status_req = requests.get(self.status_url)
+                    if (status_req.status_code == requests.codes.ok):
+                        task_status = 
status_req.json()['status']['status'].strip()
+                        logging.debug('Got new task {0} status: 
{1}'.format(self.task_id, task_status))
+                    else:
+                        logging.error('Error getting task {0} status. 
Aborting.'.format(self.task_id))
+                        return False
+                except RequestException:
+                    logging.exception('Error trying to get task {0} status. 
Aborting.'.format(self.task_id))
+                    return False
+            if task_status == self._STATUS_SUCCESS:
+                logging.debug('Kill task {0} succeeded. Gracefully 
finish.'.format(self.task_id))
+                return True
+            elif task_status == self._STATUS_FAILED:
+                logging.error('Kill task {0} failed. 
Aborting.'.format(self.task_id))
+            else:
+                logging.error('Unexpected status for task {0}: {1}. 
Aborting.'.format(self.task_id, task_status))
+            return False
+
+
+def parse_int_arg(arguments, argument_name):
+    try:
+        return int(arguments[argument_name])
+    except ValueError:
+        logging.exception('Incorrect format for {0}. Integer expected. 
Aborting.'.format(argument_name))
+        return None
+
 
 if __name__ == '__main__':
 
-    check_datasource_url_pattern = 
'http://{0}:{1}/druid/coordinator/v1/metadata/datasources/{2}'
-    drop_url_pattern = 'http://{0}:{1}/druid/indexer/v1/task'
-    drop_payload_pattern = 
'{{"type":"kill","id":"{0}","dataSource":"{1}","interval":"{2}/{3}"}}'
-    day_pattern = '%Y-%m-%d'
+    # parse arguments
+    arguments = docopt(__doc__)
 
-    # Parse arguments
-    arguments       = docopt(__doc__)
-    datasource      = arguments['<datasource>']
-    host            = arguments['--druid-host']
-    coord_port      = int(arguments['--druid-coord-port'])
-    overlord_port   = int(arguments['--druid-overlord-port'])
-    days            = int(arguments['--older-than-days'])
-    drop_start      = arguments['--beginning-of-time']
-    verbose         = arguments['--verbose']
-    dry_run         = arguments['--dry-run']
-
+    # Configure log level
     log_level = logging.INFO
-    if verbose:
+    if arguments['--verbose']:
         log_level = logging.DEBUG
 
     logging.basicConfig(level=log_level,
                         format='%(asctime)s %(levelname)-6s %(message)s',
                         datefmt='%Y-%m-%dT%H:%M:%S')
 
-    # Check for datasource segments
-    check_datasource_url = check_datasource_url_pattern.format(host, 
coord_port, datasource)
+    # Check arguments validity
     try:
-        check_req = requests.get(check_datasource_url)
-        if check_req.status_code != requests.codes.ok:
-          logging.error('Couldn\'t get datasource metadata for datasource {0}. 
 Aborting.'.format(datasource))
-          sys.exit(1)
-        else:
-            logging.info('Datasource {0} exists in druid.'.format(datasource))
-    except RequestException:
-        logging.exception('Error trying to check druid datasource. Aborting.')
+        validated_beginning_of_time = 
datetime.datetime.strptime(arguments['--beginning-of-time'], '%Y-%m-%d')
+    except ValueError:
+        logging.exception('Incorrect format for --beginning-of-time parameter. 
YYYY-MM-DD expected. Aborting.')
+        validated_beginning_of_time = None
+    validated_druid_coord_port = parse_int_arg(arguments, '--druid-coord-port')
+    validated_druid_overlord_port = parse_int_arg(arguments, 
'--druid-overlord-port')
+    validated_older_than_days = parse_int_arg(arguments, '--older-than-days')
+    validated_sleep_between_status_checks = parse_int_arg(arguments, 
'--sleep-between-status-checks')
+
+    # In case of any argument error, abort
+    if not (validated_beginning_of_time
+        and validated_druid_coord_port
+        and validated_druid_overlord_port
+        and validated_older_than_days
+        and validated_sleep_between_status_checks):
         sys.exit(1)
 
-    # reused constants
-    now = datetime.datetime.utcnow()
-    dt_day_postfix = 'T00:00:00.000Z'
+    # Create data dropper and execute
+    dataDropper = DruidDataDropper(
+        arguments['<datasource>'],
+        arguments['--druid-host'],
+        validated_druid_coord_port,
+        validated_druid_overlord_port,
+        validated_older_than_days,
+        validated_beginning_of_time,
+        not arguments['--no-datasource-check'],
+        validated_sleep_between_status_checks,
+        arguments['--dry-run'])
 
-    # Drop task id: drop_{datasource}_{now}
-    task_id = 'drop_{0}_{1}'.format(datasource, 
now.strftime('%Y-%m-%dT%H:%M:%S'))
-    logging.debug('Computed task_id: {0}'.format(task_id))
-
-    # Enforce valid start day by parsing it, and make itv_start
-    drop_start_datetime = datetime.datetime.strptime(drop_start, day_pattern)
-    drop_start_day = datetime.datetime.strftime(drop_start_datetime, 
day_pattern)
-    itv_start = drop_start_day + dt_day_postfix
-    logging.debug('Computed itv_start: {0}'.format(itv_start))
-
-    # Compute itc_end from days
-    oldest_accepted_datetime = now - datetime.timedelta(days=days)
-    oldest_accepted_day = datetime.datetime.strftime(oldest_accepted_datetime, 
day_pattern)
-    itv_end = oldest_accepted_day + dt_day_postfix
-    logging.debug('Computed itv_end: {0}'.format(itv_end))
-
-    # Prepare drop url and payload
-    drop_url = drop_url_pattern.format(host, overlord_port)
-    logging.debug('Computed drop_url: {0}'.format(drop_url))
-    drop_payload = drop_payload_pattern.format(task_id, datasource, itv_start, 
itv_end)
-    logging.debug('Computed drop_payload: {0}'.format(drop_payload))
-
-    if dry_run:
-        logging.info('URL to request: {0}'.format(drop_url))
-        logging.info('Payload to post: {0}'.format(drop_payload))
-    else:
-        try:
-            headers = {'Content-type': 'application/json'}
-            drop_req = requests.post(drop_url, data=drop_payload, 
headers=headers)
-            logging.debug('Drop request sent.')
-            if drop_req.status_code == requests.codes.ok:
-                received_task_id = drop_req.json()['task']
-                logging.debug('Drop task successfully sent with id 
{0}'.format(received_task_id))
-            else:
-                logging.error('Drop task error for id {0}'.format(task_id))
-                sys.exit(1)
-        except RequestException:
-            logging.exception('Error trying to drop druid data. Aborting.')
-            sys.exit(1)
+    return_code = 0 if dataDropper.execute() else 1
+    sys.exit(return_code)

-- 
To view, visit https://gerrit.wikimedia.org/r/362396
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ibccbebe484fe28b1faa25b08cfba6d5a73c0e12c
Gerrit-PatchSet: 2
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org>
Gerrit-Reviewer: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to