QChris has submitted this change and it was merged.

Change subject: Add code to auto-drop old hive partitions and remove partition 

Add code to auto-drop old hive partitions and remove partition directories

Change-Id: I457941316bfdf40a73881359f276520d5f11ee4a
A .gitignore
A bin/hive-drop-webrequest-partitions
A python/README.md
A python/refinery/__init__.py
A python/refinery/util.py
A python/tests/test_refinery/test_util.py
6 files changed, 693 insertions(+), 0 deletions(-)

  QChris: Verified; Looks good to me, approved

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0d20b64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
diff --git a/bin/hive-drop-webrequest-partitions 
new file mode 100755
index 0000000..d7657c0
--- /dev/null
+++ b/bin/hive-drop-webrequest-partitions
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Note: You should make sure to put refinery/python on your PYTHONPATH.
+#   export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python
+Automatically drops old Hive partitions from the webrequest table
+and deletes the hourly time bucketed directory from HDFS.
+Usage: hive-drop-webrequest-partitions [options]
+    -h --help                           Show this help message and exit.
+    -d --older-than-days=<days>         Drop data older than this number of 
days.  [default: 60]
+    -D --database=<dbname>              Hive database name.  [default: default]
+    -t --table=<table>                  Name of webrequest table.  [default: 
+    -o --hive-options=<options>         Any valid Hive CLI options you want to 
pass to Hive commands.
+                                        Example: '--auxpath 
+                                        [default: --auxpath 
+    -v --verbose                        Turn on verbose debug logging.
+    -n --dry-run                        Don't actually drop any partitions, 
just output the Hive queries to drop partitions.
+__author__ = 'Andrew Otto <o...@wikimedia.org>'
+import datetime
+from   docopt   import docopt
+import logging
+import re
+import os
+from refinery.util import HiveUtils, HdfsUtils
+# from pprint import pprint as pp
+if __name__ == '__main__':
+    # parse arguments
+    arguments = docopt(__doc__)
+    # pp(arguments)
+    days          = int(arguments['--older-than-days'])
+    database      = arguments['--database']
+    table         = arguments['--table']
+    hive_options  = arguments['--hive-options']
+    verbose       = arguments['--verbose']
+    dry_run       = arguments['--dry-run']
+    log_level = logging.INFO
+    if verbose:
+        log_level = logging.DEBUG
+    logging.basicConfig(level=log_level,
+                        format='%(asctime)s %(levelname)-6s %(message)s',
+                        datefmt='%Y-%m-%dT%H:%M:%S')
+    # This regex names its group matches by the actual
+    # partition column name.  This is required so that
+    # HiveUtils partition_spec_from_path can parse a path
+    # and return a partition spec for use in a Hive DDL statement.
+    webrequest_path_regex = 
+    # This regex tells HiveUtils partition_datetime_from_path
+    # how to extract just the date portion from a partition path.
+    # The match group will be passed to dateutil.parser.parse to
+    # return a datetime object.
+    webrequest_date_regex = re.compile(r'.*/hourly/(.+)$')
+    # Allows easy extraction of partition fields from the partition spec.
+    # This regex is used with HiveUtils partition_datetime_from_spec.
+    partition_spec_regex   = 
+    # Instantiate HiveUtils.
+    hive = HiveUtils(database, hive_options)
+    # The base location of this webrequest table in HDFS.
+    table_location = hive.table_location(table)
+    # This glob will be used to list out all partition paths in HDFS.
+    partition_glob = os.path.join(table_location, '*', 'hourly', '*', '*', 
'*', '*')
+    # Delete partitions older than this.
+    old_partition_datetime_threshold = datetime.datetime.now() - 
+    partition_specs_to_drop   = []
+    partition_paths_to_delete = []
+    # Loop through all partitions for this table and drop anything that is too 
+    for partition_spec in hive.partitions(table):
+        partition_datetime = hive.partition_datetime_from_spec(
+            partition_spec,
+            partition_spec_regex
+        )
+        if partition_datetime < old_partition_datetime_threshold:
+            partition_specs_to_drop.append(partition_spec)
+    # Loop through all the partition directory paths for this table
+    # and check if any of them are old enough for deletion.
+    for partition_path in HdfsUtils.ls(partition_glob, include_children=False):
+        partition_datetime = hive.partition_datetime_from_path(
+            partition_path,
+            webrequest_date_regex
+        )
+        if partition_datetime < old_partition_datetime_threshold:
+            partition_paths_to_delete.append(partition_path)
+    # Drop any old Hive partitions
+    if partition_specs_to_drop:
+        if dry_run:
+            print(hive.drop_partitions_ddl(table, partition_specs_to_drop))
+        else:
+            logging.info('Dropping {0} partitions from table {1}.{2}'
+                .format(len(partition_specs_to_drop), database, table)
+            )
+            hive.drop_partitions(table, partition_specs_to_drop)
+    else:
+        logging.info('No partitions need dropped for table 
{0}.{1}'.format(database, table))
+    # Delete any old HDFS data
+    if partition_paths_to_delete:
+        if dry_run:
+            print('hdfs dfs -rm -R ' + ' '.join(partition_paths_to_delete))
+        else:
+            logging.info('Removing {0} partition directories for table {1}.{2} 
from {3}.'
+                .format(len(partition_paths_to_delete), database, table, 
+            )
+            HdfsUtils.rm(' '.join(partition_paths_to_delete))
+    else:
+        logging.info('No partition directories need removed for table 
{0}.{1}'.format(database, table))
diff --git a/python/README.md b/python/README.md
new file mode 100644
index 0000000..113d04f
--- /dev/null
+++ b/python/README.md
@@ -0,0 +1,14 @@
+# Analytics Refinery Python Module
+## Usage:
+The ```refinery/python``` directory should be in your ```PYTHONPATH```
+in order to propery import this module.
+export PYTHONPATH=/path/to/analytics/refinery/python
+This env variable should be set for shell accounts on nodes in
+the Wikimedia Analytics Cluster automatically.
+You should then be able to ```import refinery``` in your python code.
diff --git a/python/refinery/__init__.py b/python/refinery/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/refinery/__init__.py
diff --git a/python/refinery/util.py b/python/refinery/util.py
new file mode 100755
index 0000000..e513cb0
--- /dev/null
+++ b/python/refinery/util.py
@@ -0,0 +1,445 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Wikimedia Anaytics Refinery python utilities.
+import datetime
+from dateutil.parser import parse as dateutil_parse
+import logging
+import os
+import subprocess
+import re
+import tempfile
+from urlparse import urlparse
+logger = logging.getLogger('refinery-util')
+def sh(command, check_return_code=True, strip_output=True, 
+    """
+    Executes a shell command and return the stdout.
+    Parameters
+        command             : The command to run.  Either an array or a string.
+                              If it is a string, shell=True will be passed to
+                              subprocess.Popen.
+        check_return_code   : If the command does not exit with 0, a 
+                              will be raised.
+        strip_output        : If True, output.strip() will be called before 
+                              Default: True
+        return_stderr       : If True, (stdout, stderr) will be returned as a 
+                              Default: False
+    Returns
+        The stdout output of the shell command.
+    Raises
+        RuntimeError         : check_return_code == True and the command exited
+                               with a non zero exit code.
+    """
+    # command_string is just for log messages.
+    if isinstance(command, list):
+        shell = False
+        command_string = ' '.join(command)
+    else:
+        shell = True
+        command_string = command
+    logger.debug('Running: {0}'.format(command_string))
+    p = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, shell=shell)
+    stdout, stderr = p.communicate()
+    if check_return_code and p.returncode != 0:
+        raise RuntimeError("Command: {0} failed with error code: {1}"
+            .format(command_string, p.returncode), stdout, stderr)
+    if strip_output:
+        stdout = stdout.strip()
+        stderr = stderr.strip()
+    if return_stderr:
+        return (stdout, stderr)
+    else:
+        return stdout
+class HiveUtils(object):
+    """
+    A convience object for running hive queries via the Hive CLI.
+    Most of the methods here work with table and partition DDL.
+    Parameters:
+        database    : The Hive database name to use
+        options     : Other options to be passed directly to the Hive CLI.
+    """
+    partition_desc_separator = '/'
+    partition_spec_separator = ','
+    def __init__(self, database='default', options=''):
+        self.database   = database
+        if options:
+            self.options    = options.split()
+        else:
+            self.options = []
+        self.hivecmd = ['hive'] + self.options + ['--service', 'cli', 
'--database', self.database]
+        self.tables  = {}
+    def _tables_get(self):
+        """Returns a list of tables in the current database"""
+        return self.query('SET hive.cli.print.header=false; SHOW 
+    def _tables_init(self, force=False):
+        """
+        Initializes the self.tables dict.
+        If self.tables already has entries, self.tables will just be returned.
+        If force is True, then self.tables will be cleared and reinitialized.
+        """
+        if self.tables and not force:
+            return self.tables
+        if force:
+            self.reset()
+        for table in self._tables_get():
+            self.tables[table] = {}
+        return self.tables
+    def reset(self):
+        """
+        Destroy's this object's table info cache.
+        Run this if you think your table state has changed,
+        and you want subsequent commands to reload data by
+        running the appropriate Hive queries again.
+        """
+        self.tables = {}
+    def table_exists(self, table): # ,force=False
+        """Returns true if the table exists in the current database."""
+        self._tables_init()
+        return table in self.tables.keys()
+    def table_schema(self, table):
+        """Returns the table's CREATE schema."""
+        self._tables_init()
+        if 'schema' not in self.tables[table].keys():
+            q = 'SET hive.cli.print.header=false; SHOW CREATE TABLE 
+            self.tables[table]['schema'] = self.query(q)
+        return self.tables[table]['schema']
+    def table_metadata(self, table):
+        """
+        Parses the output of DESCRIBE FORMATTED and stores the
+        metadata as a dict in self.tables[table]['metadata'].
+        """
+        self._tables_init()
+        if 'metadata' not in self.tables[table].keys():
+            self.tables[table]['metadata'] = {}
+            q = 'SET hive.cli.print.header=false; DESCRIBE FORMATTED 
+            for line in self.query(q).splitlines():
+                  try:
+                      key, value = line.split(':', 1)
+                      if value:
+                          self.tables[table]['metadata'][key.strip()] = 
+                  except ValueError:
+                      # not at least two elements in line
+                      pass
+        return self.tables[table]['metadata']
+    def table_location(self, table, strip_nameservice=False):
+        """Returns the table's base location by looking at the table's CREATE 
+        self._tables_init()
+        table_location = self.table_metadata(table)['Location']
+        if strip_nameservice and table_location.startswith('hdfs://'):
+            table_location = urlparse(table_location)[2]
+        return table_location
+    def partitions(self, table):
+        """
+        Returns a list of partitions for the given Hive table in partition 
spec format.
+        Returns:
+            A list of partititon spec strings.
+        """
+        self._tables_init()
+        # Cache results for later.
+        # If we don't know the partitions yet, get them now.
+        if not 'partitions' in self.tables[table].keys():
+            partition_descs = self.query('SET hive.cli.print.header=false; 
SHOW PARTITIONS {0};'.format(table)).splitlines()
+            # Convert the desc format to spec format and return that
+            self.tables[table]['partitions'] = [
+                self.partition_spec_from_partition_desc(p)
+                for p in partition_descs
+            ]
+        return self.tables[table]['partitions']
+    def drop_partitions(self, table, partition_specs):
+        """
+        Runs ALTER TABLE table DROP PARTITION ... for each of the 
+        """
+        if partition_specs:
+            q = self.drop_partitions_ddl(table, partition_specs)
+            # This query could be large if there are many partiitons to drop.
+            # Use a tempfile when dropping partitions.
+            return self.query(q, use_tempfile=True)
+        else:
+            logger.info("Not dropping any partitions for table {0}.  No 
partition datetimes were given.".format(table))
+    def drop_partitions_ddl(self, table, partition_specs):
+        """
+        Returns a complete hive statement to drop partitions from
+        table for the given partition_specs
+        """
+        partition_specs.sort()
+        return '\n'.join(['ALTER TABLE {0} DROP IF EXISTS PARTITION 
({1});'.format(table, spec) for spec in partition_specs])
+    @staticmethod
+    def partition_spec_from_partition_desc(desc):
+        """
+        Returns a partition spec from a partition description
+        output from a 'SHOW PARTITIONS' Hive statement.
+        """
+        # Loop through each partition, adding quotes around strings.
+        spec_parts = []
+        for p in desc.split(HiveUtils.partition_desc_separator):
+            (key,value) = p.split('=')
+            if not value.isdigit():
+                value = '\'{0}\''.format(value)
+            spec_parts.append('{0}={1}'.format(key, value))
+        # Replace partition_desc_separators with partition_spec_separators.
+        return HiveUtils.partition_spec_separator.join(spec_parts)
+    @staticmethod
+    def partition_spec_from_path(path, regex):
+        """
+        Given an HDFS path and a regex with matching groups named
+        after partition keys, this method returns a partition spec
+        suitable for use in Hive partition DDL statements.
+        Parameters:
+            path     : Path to a partition
+            regex    : Regular expression that can extract match groups
+                       by partition key names from the path string.
+                       regex may be a string or a compiled re.
+        Returns:
+            A partition spec string.
+        Example:
+            partition_spec_from_path(
+            )
+            returns: 
+        """
+        if isinstance(regex, basestring):
+            regex = re.compile(regex)
+        group_matches_in_order = [
+            g[1] for g in sorted(
+                [(index, group) for group, index in regex.groupindex.items()]
+            )
+        ]
+        match = regex.search(path)
+        spec_parts = []
+        for key in group_matches_in_order:
+            # if the match is a number, no need for quotes
+            if match.group(key).isdigit():
+                value = match.group(key)
+            # otherwise, quote it!
+            else:
+                value = '\'{0}\''.format(match.group(key))
+            spec_parts.append('{0}={1}'.format(key, value))
+        return HiveUtils.partition_spec_separator.join(spec_parts)
+    @staticmethod
+    def partition_datetime_from_spec(spec, regex):
+        """
+        Given a partition spec string, and a regex that names
+        match groups by their date names, this returns
+        a datetime object representing this partition.
+        Parameters:
+            spec     : Partition spec string
+            regex    : Regular expression that can extract match groups
+                       by partition key names from the spec string.
+                       regex may be a string or a compiled re.
+        Returns:
+            datetime object matching this spec's date.
+        Example:
+            partition_datetime_from_spec(
+            )
+            returns: datetime.datetime(2014, 5, 14, 23, 0)
+        """
+        if isinstance(regex, basestring):
+            regex = re.compile(regex)
+        match = regex.search(spec)
+        return datetime.datetime(
+            int(match.groupdict().get('year')),
+            int(match.groupdict().get('month', 1)),
+            int(match.groupdict().get('day',   1)),
+            int(match.groupdict().get('hour',  0))
+        )
+    @staticmethod
+    def partition_datetime_from_path(path, regex):
+        """
+        Given an HDFS path and a regex with the first matching
+        group a date string suitable for passing to dateutil.parser.parse,
+        this returns a datetime object representing this partition path.
+        Parameters:
+            path     : Path to a partition
+            regex    : Regular expression that can extract date string
+                       as match.group(1) that can be parsed with
+                       dateutil.parser.parse.
+                       regex may be a string or a compiled re.
+        Returns:
+            datetime object matching this spec's date.
+        Example:
+            partition_datetime_from_path(
+                regex=r'.*/hourly/(.+)$'
+            )
+            returns: datetime.datetime(2014, 5, 14, 23, 0)
+        """
+        if isinstance(regex, basestring):
+            regex = re.compile(regex)
+        return dateutil_parse(regex.search(path).group(1))
+    def query(self, query, check_return_code=True, use_tempfile=False):
+        """
+        Runs the given Hive query and returns stdout.
+        Parameters:
+            query             : The Hive query to run
+            check_return_code : Passed to refinery.util.sh()
+            use_tempfile      : If use_tempfile is True, the query will be 
written to
+                                a temporary file and run as a Hive script.
+        Returns:
+            stdout output from Hive query
+        """
+        if use_tempfile:
+            with tempfile.NamedTemporaryFile(prefix='tmp-hive-query-', 
suffix='.hiveql') as f:
+                logger.debug('Writing Hive query to tempfile 
+                f.write(query)
+                f.flush()
+                out = self.script(f.name, check_return_code)
+                # NamedTemporaryFile will be deleted on close().
+            return out
+        else:
+            return self._command(['-e', query], check_return_code)
+    def script(self, script, check_return_code=True):
+        """Runs the contents of the given script in hive and returns stdout."""
+        if not os.path.isfile(script):
+            raise RuntimeError("Hive script: {0} does not 
+        return self._command( ['-f', script], check_return_code)
+    def _command(self, args, check_return_code=True):
+        """Runs the `hive` from the command line, passing in the given args, 
+           returning stdout.
+        """
+        cmd = self.hivecmd + args
+        return sh(cmd, check_return_code)
+class HdfsUtils(object):
+    # TODO:  Use snakebite instead of shelling out to 'hdfs dfs'.
+    @staticmethod
+    def ls(paths, include_children=True):
+        """
+        Runs hdfs dfs -ls on paths.
+        Parameters:
+            paths            : List or string paths to files to ls.  Can 
include shell globs.
+            include_children : If include_children is False, the -d flag will
+                               be given to hdfs dfs -ls.
+        Returns:
+            Array of paths matching the ls-ed path.
+        """
+        if isinstance(paths, str):
+            paths = paths.split()
+        options = []
+        if not include_children:
+            options.append('-d')
+        return [
+            line.split()[-1] for line in sh(
+                ['hdfs', 'dfs', '-ls'] + options + paths,
+                # Not checking return code here so we don't
+                # fail paths do not exist.
+                check_return_code=False
+            ).splitlines() if not line.startswith('Found ')
+        ]
+    @staticmethod
+    def rm(paths):
+        """
+        Runs hdfs dfs -rm -R on paths.
+        """
+        if isinstance(paths, str):
+            paths = paths.split()
+        return sh(['hdfs', 'dfs', '-rm', '-R'] + paths)
diff --git a/python/tests/test_refinery/test_util.py 
new file mode 100644
index 0000000..43209d7
--- /dev/null
+++ b/python/tests/test_refinery/test_util.py
@@ -0,0 +1,93 @@
+from unittest import TestCase
+from datetime import datetime, timedelta
+from refinery.util import HiveUtils, HdfsUtils, sh
+import os
+class TestReinferyUtil(TestCase):
+    def test_sh(self):
+        command = ['/bin/echo', 'test-list']
+        output = sh(command)
+        self.assertEqual(output, 'test-list')
+        command = '/bin/echo test-string'
+        output = sh(command)
+        self.assertEqual(output, 'test-string')
+    def test_sh_pipe(self):
+        command = '/bin/echo hi_there | /usr/bin/env sed -e \'s@_there@_you@\''
+        output = sh(command)
+        self.assertEqual(output, 'hi_you')
+class TestHiveUtil(TestCase):
+    def setUp(self):
+        self.hive = HiveUtils()
+        self.hive.tables = {
+            'table1': {
+                'metadata': {
+                    'Location':      
+                }
+            },
+        }
+        self.table_info = {
+            'table1': {
+                'location':             '/path/to/table1',
+                'partitions_desc':      
+                'partitions_spec':      
+                'partitions_datetime':  [datetime(2013,10,01,01), 
+                'partitions_path':      
+            },
+        }
+    def test_reset(self):
+        self.hive.reset()
+        self.assertEqual(self.hive.tables, {})
+    def test_table_exists(self):
+        self.assertTrue(self.hive.table_exists('table1'))
+        self.assertFalse(self.hive.table_exists('nonya'))
+    def test_table_location(self):
+        self.assertEquals(self.hive.table_location('table1'), 
+        self.assertEquals(self.hive.table_location('table1', 
strip_nameservice=True), '/path/to/table1')
+    def test_partition_spec_from_partition_desc(self):
+        expect = self.table_info['table1']['partitions_spec'][0]
+        spec   = 
+        self.assertEqual(spec, expect)
+    def test_partition_spec_from_path(self):
+        expect = self.table_info['table1']['partitions_spec'][0]
+        path   = self.table_info['table1']['partitions_path'][0]
+        regex  = 
+        spec = HiveUtils.partition_spec_from_path(path, regex)
+        self.assertEqual(spec, expect)
+    def test_partition_datetime_from_spec(self):
+        expect = self.table_info['table1']['partitions_datetime'][0]
+        spec   = self.table_info['table1']['partitions_spec'][0]
+        regex  = 
+        dt     = HiveUtils.partition_datetime_from_spec(spec, regex)
+        self.assertEqual(dt, expect)
+    def test_partition_datetime_from_path(self):
+        expect = self.table_info['table1']['partitions_datetime'][0]
+        path   = self.table_info['table1']['partitions_path'][0]
+        regex  = r'.*/hourly/(.+)$'
+        dt     = HiveUtils.partition_datetime_from_path(path, regex)
+        self.assertEqual(dt, expect)
+    def test_drop_partitions_ddl(self):
+        partition_ddls = ['PARTITION ({0})'.format(spec)
+            for spec in self.table_info['table1']['partitions_spec']
+        ]
+        expect = '\n'.join(['ALTER TABLE {0} DROP IF EXISTS 
{1};'.format('table1', partition_ddl) for partition_ddl in partition_ddls])
+        statement = self.hive.drop_partitions_ddl('table1', 
+        self.assertEqual(statement, expect)

To view, visit https://gerrit.wikimedia.org/r/135128
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I457941316bfdf40a73881359f276520d5f11ee4a
Gerrit-PatchSet: 5
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: Milimetric <dandree...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: QChris <christ...@quelltextlich.at>

MediaWiki-commits mailing list

Reply via email to