Milimetric has submitted this change and it was merged.

Change subject: Adding import logic for dumps pagecounts
......................................................................


Adding import logic for dumps pagecounts

Factoring out useful parts of hive-partitioner

Change-Id: If84fcee97403f91218dc5fade5a86dbdb7daab69
Card: analytics 1195.1
---
M .gitignore
M kraken-etl/hive-partitioner
A kraken-etl/pagecounts/import-one-hour.sh
A kraken-etl/pagecounts/import.py
A kraken-etl/test.py
A kraken-etl/util.py
6 files changed, 356 insertions(+), 154 deletions(-)



diff --git a/.gitignore b/.gitignore
index 4404079..fe03682 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,5 @@
 tmp/
 *.swp
 .deploy
+
+*.pyc
diff --git a/kraken-etl/hive-partitioner b/kraken-etl/hive-partitioner
index 0e6a346..3581fc7 100755
--- a/kraken-etl/hive-partitioner
+++ b/kraken-etl/hive-partitioner
@@ -17,158 +17,12 @@
 import logging
 import os
 import subprocess
-
 import pprint
+
+from util import HiveUtils, CamusUtils, sh, diffDatewise, interval_hierarchies
 
 pp     = pprint.pprint
 logger = logging.getLogger('hive-partitioner')
-
-interval_hierarchies = {
-    'hourly':  { 'depth': 4, 'directory_format': '%Y/%m/%d/%H', 
'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' },
-    'daily':   { 'depth': 3, 'directory_format': '%Y/%m/%d',    
'hive_partition_format': 'year=%Y/month=%m/day=%d' },
-    'monthly': { 'depth': 2, 'directory_format': '%Y/%m',       
'hive_partition_format': 'year=%Y/month=%m' },
-    'yearly':  { 'depth': 1, 'directory_format': '%Y',          
'hive_partition_format': 'year=%Y' },
-}
-
-def topics(path):
-    """Reads topic names out of camus_destination_path"""
-    return sh('hadoop fs -ls %s/ | grep -v "Found .* items" | awk -F "/" 
\'{print $NF}\'' % (path)).split('\n')
-
-# TODO configure interval partition paths automatically instead of hardcoding
-def camus_partitions(topic, camus_destination_path, interval='hourly'):
-    """
-    Returns a list of time bucketd 'partitions' created by Camus import.
-    These are inferred from directories in hdfs.
-    """
-    
-    basedir = camus_destination_path + '/' + topic + '/' + interval
-
-
-    depth = interval_hierarchies[interval]['depth']
-    # number of  wildcard time bucket directories, e.g. depth_stars == 4 -> 
'/*/*/*/*'
-    depth_stars = '/*' * depth
-    directories = sh('hadoop fs -ls -d %s%s | grep -v \'Found .* items\' | awk 
\'{print $NF}\'' % (basedir, depth_stars)).split('\n')
-    # return list of time bucket directories, e.g. ['2013/10/09/15', 
'2013/10/09/16', ... ]
-    return [directory.replace(basedir + '/', '') for directory in directories]
-
-    # return sh('hadoop fs -ls -R {0}/{1}/{2} | sed 
"s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, 
month=\2, day=\3, hour=\4@" | grep "year.*" | sort | 
uniq'.format(camus_destination_path, topic, interval)).split()
-    # return a list of lists of time buckets, e.g. [['2013']]
-    # return [directory.split('/')[-depth:] for directory in directories]    
-    # return [(int(d) for d in directory.split('/')[-depth:]) for directory in 
directories]
-
-def call(command, check_return_code=True):
-    logger.debug('Running: {0}'.format(' '.join(command)))
-    p = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
-    stdout, stderr = p.communicate()
-    if check_return_code and p.returncode != 0:
-        raise RuntimeError("Command: {0} failed with error code: {1}".format(" 
".join(command), p.returncode),
-                               stdout, stderr)
-    return stdout.strip()
-
-def sh(command):
-    logger.debug('Running: %s' % (command))
-    return os.popen(command).read().strip()
-
-
-def diff_datewise(left, right, left_format=None, right_format=None):
-    """
-    Parameters
-        left            : a list of datetime strings or objects
-        right           : a list of datetime strings or objects
-        left_format     : None if left contains datetimes, or strptime format
-        right_format    : None if right contains datetimes, or strptime format
- 
-    Returns
-        A tuple of two sets:
-        [0] : the datetime objects in left but not right
-        [1] : the datetime objects in right but not left
-    """
-    
-    if left_format:
-        left_set = set([datetime.strptime(l, left_format) for l in left])
-    else:
-        left_set = set(left)
-    
-    if right_format:
-        right_set = set([datetime.strptime(r, right_format) for r in right])
-    else:
-        right_set = set(right)
-    
-    return (left_set - right_set, right_set - left_set)
-
-
-class HiveUtils(object):
-    def __init__(self, database='default', options=''):
-        self.database   = database
-        self.options    = options.split()
-        self.hivecmd    = None
-        # list of partitions for table, keyed by table
-        # self.table_partitions = {}
-        self.hivecmd = ['hive'] + self.options + ['cli', '--database', 
self.database]
-        # initialize tables dict with table names
-        self.tables  = {}
-        # cache results for later
-        for t in self.tables_get():
-            self.tables[t] = {}
-
-    def partitions(self, table):
-        """Returns a list of partitions for the given Hive table."""
-
-        # cache results for later
-        # if we don't know the partitions yet, get them now
-        if not 'partitions' in self.tables[table].keys():
-            stdout     = self.query("SHOW PARTITIONS %s;" % table)
-            partitions = stdout.split('\n')
-            partitions.remove('partition')
-            self.tables[table]['partitions'] = partitions
-
-        return self.tables[table]['partitions']
-
-    def table_exists(self, table): # ,force=False
-        return table in self.tables.keys()
-
-    def tables_get(self):
-        t = self.query('SHOW TABLES').split('\n')
-        t.remove('tab_name')
-        return t
-
-    def partition_interval(self, table):
-        intervals = {
-            4: 'hourly',
-            3: 'daily',
-            2: 'monhtly',
-            1: 'yearly',
-        }
-        
-        # cache results for later
-        if not 'interval' in self.tables[table].keys():
-            # counts the number of partition keys this table has
-            # and returns a string time interval based in this number        
-            cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table 
+ ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc 
-l'
-            logger.debug('Running: %s' % cmd)
-            # using check_output directly here so that we can pass shell=True 
and use pipes.
-            partition_depth = int(subprocess.check_output(cmd, 
stderr=subprocess.PIPE, shell=True).strip())
-            self.tables[table]['depth']    = partition_depth
-            self.tables[table]['interval'] = intervals[partition_depth]
-
-        return self.tables[table]['interval']
-
-    def query(self, query, check_return_code=True):
-        """Runs the given hive query and returns stdout"""
-        return self.command(['-e', query], check_return_code)
-
-    def script(self, script, check_return_code=True):
-        """Runs the contents of the given script in hive and returns stdout"""
-        if not os.path.isfile(script):
-            raise RuntimeError("Hive script: {0} does not 
exist.".format(script))
-        return self.command( ['-f', script], check_return_code)
-
-    def command(self, args, check_return_code=True):
-        """Runs the `hive` from the command line, passing in the given args, 
and
-           returning stdout.
-        """
-        cmd = self.hivecmd + args
-        return call(cmd, check_return_code)
 
 
 if __name__ == '__main__':
@@ -182,23 +36,23 @@
     hive_options           = arguments['--hive-options']
 
     hive = HiveUtils(database, hive_options)
+    camus = CamusUtils(camus_destination_path)
 
-    topics = topics(camus_destination_path)
+    topics = camus.topics()
     print("Topics: " + ','.join(topics))
     for topic in topics:
         if hive.table_exists(topic):
             interval         = hive.partition_interval(topic)
             hive_partitions  = hive.partitions(topic)
-            camus_partitions = camus_partitions(topic, camus_destination_path)
+            camus_partitions = camus.partitions(topic, camus_destination_path)
 
-            logger.debug(("Hive Partitions for %s:\n" % topic) + 
'\n.join(hive_partitions))
+            logger.debug(("Hive Partitions for %s:\n" % topic) + 
'\n'.join(hive_partitions))
             logger.debug(("Camus Partitions for %s:\n " % topic) + 
'\n'.join(camus_partitions))
             logger.debug("%s import interval is %s" % (topic, interval))
-            
+
             # diff the camus and hive partitions
-            missing_hive, missing_camus = diff_datewise(camus_partitions, 
hive_partitions,
+            missing_hive, missing_camus = diffDatewise(camus_partitions, 
hive_partitions,
                 interval_hierarchies[interval]['directory_format'],
                 interval_hierarchies[interval]['hive_partition_format'])
             print("Need to create partition for:")
             pp(missing_hive)
-
diff --git a/kraken-etl/pagecounts/import-one-hour.sh 
b/kraken-etl/pagecounts/import-one-hour.sh
new file mode 100644
index 0000000..ff8fd1d
--- /dev/null
+++ b/kraken-etl/pagecounts/import-one-hour.sh
@@ -0,0 +1,43 @@
+#!/bin/bash
+#
+# This script does the following:
+#       0. reads four arguments from the CLI, in order, as YEAR, MONTH, DAY, 
HOUR
+#       1. downloads the specified hour worth of data from 
http://dumps.wikimedia.org/other/pagecounts-raw/
+#       2. extracts the data into hdfs
+#       3. creates a partition on a hive table pointing to this data
+#
+
+print_help() {
+        cat <<EOF
+        
+USAGE: import_one_hour.sh YEAR MONTH DAY HOUR
+        
+  Imports one hour worth of data from dumps.wikimedia.org/other/pagecounts-raw 
into a Hive table
+
+EOF
+}
+
+if [ $# -ne 4 ]
+then
+        print_help
+        exit
+fi
+
+YEAR=$1
+MONTH=$2
+DAY=$3
+HOUR=$4
+
+# Someone intelligent should set these
+LOCAL_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+HDFS_DIR=/user/milimetric/pagecounts/$YEAR.$MONTH.${DAY}_${HOUR}.00.00
+HDFS_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000
+TABLE=milimetric_pagecounts
+
+rm $LOCAL_FILE
+wget 
http://dumps.wikimedia.org/other/pagecounts-raw/$YEAR/$YEAR-$MONTH/pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+hdfs dfs -rm -r $HDFS_DIR
+gunzip -c $LOCAL_FILE | hdfs dfs -put - $HDFS_DIR/$HDFS_FILE
+rm $LOCAL_FILE
+hive -e "ALTER TABLE ${TABLE} DROP PARTITION 
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR');"
+hive -e "ALTER TABLE ${TABLE} ADD PARTITION 
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR') location '$HDFS_DIR';"
diff --git a/kraken-etl/pagecounts/import.py b/kraken-etl/pagecounts/import.py
new file mode 100644
index 0000000..6c249a9
--- /dev/null
+++ b/kraken-etl/pagecounts/import.py
@@ -0,0 +1,38 @@
+from datetime import datetime, timedelta
+from util import (
+    HdfsFileCollection, diffDatewise, timestampsToNow, shell
+)
+
+target = CamusUtils('hdfs://wmf/')
+topic = 'pagecounts'
+hdfsFormat = target.basedir(topic) + '%Y/%m/%d/%H'
+
+imported = target.partitions(topic) or [datetime.today().strftime(hdfsFormat)]
+firstHour = datetime.strptime(min(imported), hdfsFormat)
+available = timestampsToNow(firstHour, timedelta(hours=1))
+
+hoursMissing = diffDatewise(
+    available,
+    imported,
+    rightParse=hdfsFormat,
+)
+
+for missing in hoursMissing:
+    print('*************************')
+    print('Importing {0} '.format(missing))
+    
+    try:
+        result, err = shell([
+            'import-one-hour',
+            missing.year,
+            missing.month,
+            missing.day,
+            missing.hour,
+        ])
+        if err:
+            print(err)
+    except Exception as e:
+        print(e)
+    
+    print('Done Importing {0} '.format(missing))
+    print('*************************')
diff --git a/kraken-etl/test.py b/kraken-etl/test.py
new file mode 100644
index 0000000..69a7bf1
--- /dev/null
+++ b/kraken-etl/test.py
@@ -0,0 +1,46 @@
+from datetime import datetime, timedelta
+from util import diffDatewise, timestampsToNow
+from unittest import TestCase
+
+
+class TestUtil(TestCase):
+    def testDiffDatewise(self):
+        l = []
+        lJustDates = []
+        r = []
+        lp = 'blah%Y...%m...%d...%Hblahblah'
+        rp = 'neenee%Y%m%d%Hneenee'
+        
+        expect0 = set([datetime(2012, 6, 14, 13), datetime(2012, 11, 9, 3)])
+        expect1 = set([datetime(2012, 6, 14, 14), datetime(2013, 11, 10, 22)])
+        
+        for y in range(2012, 2014):
+            for m in range(1, 13):
+                # we're just diffing so we don't care about getting all days
+                for d in range(1, 28):
+                    for h in range(0, 24):
+                        x = datetime(y, m, d, h)
+                        if not x in expect1:
+                            l.append(datetime.strftime(x, lp))
+                            lJustDates.append(x)
+                        if not x in expect0:
+                            r.append(datetime.strftime(x, rp))
+        
+        result = diffDatewise(l, r, leftParse=lp, rightParse=rp)
+        self.assertEqual(result[0], expect0)
+        self.assertEqual(result[1], expect1)
+        
+        result = diffDatewise(lJustDates, r, rightParse=rp)
+        self.assertEqual(result[0], expect0)
+        self.assertEqual(result[1], expect1)
+    
+    def testTimestampsToNow(self):
+        now = datetime.now()
+        start = now - timedelta(hours=2)
+        expect = [
+            start,
+            start + timedelta(hours=1),
+            start + timedelta(hours=2),
+        ]
+        timestamps = timestampsToNow(start, timedelta(hours=1))
+        self.assertEqual(expect, list(timestamps))
diff --git a/kraken-etl/util.py b/kraken-etl/util.py
new file mode 100644
index 0000000..586de9d
--- /dev/null
+++ b/kraken-etl/util.py
@@ -0,0 +1,219 @@
+import logger
+from datetime import datetime
+
+
+logger = logging.getLogger('kraken-etl-util')
+interval_hierarchies = {
+    'hourly':  {
+        'depth': 4,
+        'directory_format': '%Y/%m/%d/%H',
+        'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H'
+    },
+    'daily':   {
+        'depth': 3,
+        'directory_format': '%Y/%m/%d',
+        'hive_partition_format': 'year=%Y/month=%m/day=%d'
+    },
+    'monthly': {
+        'depth': 2,
+        'directory_format': '%Y/%m'
+        'hive_partition_format': 'year=%Y/month=%m'
+    },
+    'yearly':  {
+        'depth': 1,
+        'directory_format': '%Y'
+        'hive_partition_format': 'year=%Y'
+    },
+}
+
+
+def diffDatewise(left, right, leftParse=None, rightParse=None):
+    """
+    Parameters
+        left        : a list of datetime strings or objects
+        right       : a list of datetime strings or objects
+        leftParse   : None if left contains datetimes, or strptime format
+        rightParse  : None if right contains datetimes, or strptime format
+
+    Returns
+        A tuple of two sets:
+        [0] : the datetime objects in left but not right
+        [1] : the datetime objects in right but not left
+    """
+    
+    if leftParse:
+        leftSet = set([
+            datetime.strptime(l.strip(), leftParse)
+            for l in left if len(l.strip())
+        ])
+    else:
+        leftSet = set(left)
+    
+    if rightParse:
+        rightSet = set([
+            datetime.strptime(r.strip(), rightParse)
+            for r in right if len(r.strip())
+        ])
+    else:
+        rightSet = set(right)
+    
+    return (leftSet - rightSet, rightSet - leftSet)
+
+
+def timestampsToNow(start, increment):
+    """
+    Generates timestamps from @start to datetime.now(), by @increment
+    
+    Parameters
+        start       : the first generated timestamp
+        increment   : the timedelta between the generated timestamps
+    
+    Returns
+        A generator that goes from @start to datetime.now() - x,
+        where x <= @increment
+    """
+    now = datetime.now()
+    while start < now:
+        yield start
+        start += increment
+
+
+def sh(command):
+    """
+    Execute a shell command and return the result
+    """
+    logger.debug('Running: {0}'.format(command))
+    return os.popen(command).read().strip()
+
+
+def call(command, check_return_code=True):
+    """
+    Execute a shell command and return the result
+    """
+    logger.debug('Running: {0}'.format(' '.join(command)))
+    p = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+    stdout, stderr = p.communicate()
+    if check_return_code and p.returncode != 0:
+        raise RuntimeError("Command: {0} failed with error code: {1}".format(" 
".join(command), p.returncode),
+                               stdout, stderr)
+    return stdout.strip()
+
+
+class HiveUtils(object):
+    def __init__(self, database='default', options=''):
+        self.database   = database
+        self.options    = options.split()
+        self.hivecmd    = None
+        # list of partitions for table, keyed by table
+        # self.table_partitions = {}
+        self.hivecmd = ['hive'] + self.options + ['cli', '--database', 
self.database]
+        # initialize tables dict with table names
+        self.tables  = {}
+        # cache results for later
+        for t in self.tables_get():
+            self.tables[t] = {}
+
+    def partitions(self, table):
+        """Returns a list of partitions for the given Hive table."""
+
+        # cache results for later
+        # if we don't know the partitions yet, get them now
+        if not 'partitions' in self.tables[table].keys():
+            stdout     = self.query("SHOW PARTITIONS %s;" % table)
+            partitions = stdout.split('\n')
+            partitions.remove('partition')
+            self.tables[table]['partitions'] = partitions
+
+        return self.tables[table]['partitions']
+
+    def table_exists(self, table): # ,force=False
+        return table in self.tables.keys()
+
+    def tables_get(self):
+        t = self.query('SHOW TABLES').split('\n')
+        t.remove('tab_name')
+        return t
+
+    def partition_interval(self, table):
+        intervals = {
+            4: 'hourly',
+            3: 'daily',
+            2: 'monhtly',
+            1: 'yearly',
+        }
+
+        # cache results for later
+        if not 'interval' in self.tables[table].keys():
+            # counts the number of partition keys this table has
+            # and returns a string time interval based in this number        
+            cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table 
+ ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc 
-l'
+            logger.debug('Running: %s' % cmd)
+            # using check_output directly here so that we can pass shell=True 
and use pipes.
+            partition_depth = int(subprocess.check_output(cmd, 
stderr=subprocess.PIPE, shell=True).strip())
+            self.tables[table]['depth']    = partition_depth
+            self.tables[table]['interval'] = intervals[partition_depth]
+
+        return self.tables[table]['interval']
+
+    def query(self, query, check_return_code=True):
+        """Runs the given hive query and returns stdout"""
+        return self.command(['-e', query], check_return_code)
+
+    def script(self, script, check_return_code=True):
+        """Runs the contents of the given script in hive and returns stdout"""
+        if not os.path.isfile(script):
+            raise RuntimeError("Hive script: {0} does not 
exist.".format(script))
+        return self.command( ['-f', script], check_return_code)
+
+    def command(self, args, check_return_code=True):
+        """Runs the `hive` from the command line, passing in the given args, 
and
+           returning stdout.
+        """
+        cmd = self.hivecmd + args
+        return call(cmd, check_return_code)
+
+
+class CamusUtils(object):
+    """
+    Deal with topics and partitions created by Camus
+    """
+    def __init__(self, destination_path):
+        self.destination_path = destination_path
+
+    def topics(self):
+        """Reads topic names out of destination_path"""
+        return sh(
+            'hdfs dfs -ls {0}/ | '\
+            'grep -v "Found .* items" | '\
+            'awk -F "/" \'{{print $NF}}\''.format(
+                self.destination_path
+            )
+        ).split('\n')
+
+    def basedir(self, topic, interval='hourly'):
+        return self.destination_path + '/' + topic + '/' + interval
+
+    # TODO configure interval partition paths automatically instead of 
hardcoding
+    def partitions(self, topic, interval='hourly'):
+        """
+        Returns a list of time bucketd 'partitions' created by Camus import.
+        These are inferred from directories in hdfs.
+        """
+
+        basedir = self.basedir(topic, interval)
+
+        depth = interval_hierarchies[interval]['depth']
+        # number of  wildcard time bucket directories, e.g. depth_stars == 4 
-> '/*/*/*/*'
+        depth_stars = '/*' * depth
+        directories = sh(
+            'hdfs dfs -ls -d {0}{1} | '\
+            'grep -v \'Found .* items\' | '\
+            'awk \'{{print $NF}}\''.format(basedir, depth_stars)
+        ).split('\n')
+        # return list of time bucket directories, e.g. ['2013/10/09/15', 
'2013/10/09/16', ... ]
+        return [directory.replace(basedir + '/', '') for directory in 
directories]
+
+        # return sh('hdfs dfs -ls -R {0}/{1}/{2} | sed 
"s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, 
month=\2, day=\3, hour=\4@" | grep "year.*" | sort | 
uniq'.format(self.destination_path, topic, interval)).split()
+        # return a list of lists of time buckets, e.g. [['2013']]
+        # return [directory.split('/')[-depth:] for directory in directories]  
  
+        # return [(int(d) for d in directory.split('/')[-depth:]) for 
directory in directories]

-- 
To view, visit https://gerrit.wikimedia.org/r/89327
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If84fcee97403f91218dc5fade5a86dbdb7daab69
Gerrit-PatchSet: 1
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: Milimetric <dandree...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to