Milimetric has submitted this change and it was merged. Change subject: Adding import logic for dumps pagecounts ......................................................................
Adding import logic for dumps pagecounts Factoring out useful parts of hive-partitioner Change-Id: If84fcee97403f91218dc5fade5a86dbdb7daab69 Card: analytics 1195.1 --- M .gitignore M kraken-etl/hive-partitioner A kraken-etl/pagecounts/import-one-hour.sh A kraken-etl/pagecounts/import.py A kraken-etl/test.py A kraken-etl/util.py 6 files changed, 356 insertions(+), 154 deletions(-) diff --git a/.gitignore b/.gitignore index 4404079..fe03682 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ tmp/ *.swp .deploy + +*.pyc diff --git a/kraken-etl/hive-partitioner b/kraken-etl/hive-partitioner index 0e6a346..3581fc7 100755 --- a/kraken-etl/hive-partitioner +++ b/kraken-etl/hive-partitioner @@ -17,158 +17,12 @@ import logging import os import subprocess - import pprint + +from util import HiveUtils, CamusUtils, sh, diffDatewise, interval_hierarchies pp = pprint.pprint logger = logging.getLogger('hive-partitioner') - -interval_hierarchies = { - 'hourly': { 'depth': 4, 'directory_format': '%Y/%m/%d/%H', 'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' }, - 'daily': { 'depth': 3, 'directory_format': '%Y/%m/%d', 'hive_partition_format': 'year=%Y/month=%m/day=%d' }, - 'monthly': { 'depth': 2, 'directory_format': '%Y/%m', 'hive_partition_format': 'year=%Y/month=%m' }, - 'yearly': { 'depth': 1, 'directory_format': '%Y', 'hive_partition_format': 'year=%Y' }, -} - -def topics(path): - """Reads topic names out of camus_destination_path""" - return sh('hadoop fs -ls %s/ | grep -v "Found .* items" | awk -F "/" \'{print $NF}\'' % (path)).split('\n') - -# TODO configure interval partition paths automatically instead of hardcoding -def camus_partitions(topic, camus_destination_path, interval='hourly'): - """ - Returns a list of time bucketd 'partitions' created by Camus import. - These are inferred from directories in hdfs. - """ - - basedir = camus_destination_path + '/' + topic + '/' + interval - - - depth = interval_hierarchies[interval]['depth'] - # number of wildcard time bucket directories, e.g. depth_stars == 4 -> '/*/*/*/*' - depth_stars = '/*' * depth - directories = sh('hadoop fs -ls -d %s%s | grep -v \'Found .* items\' | awk \'{print $NF}\'' % (basedir, depth_stars)).split('\n') - # return list of time bucket directories, e.g. ['2013/10/09/15', '2013/10/09/16', ... ] - return [directory.replace(basedir + '/', '') for directory in directories] - - # return sh('hadoop fs -ls -R {0}/{1}/{2} | sed "s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, month=\2, day=\3, hour=\4@" | grep "year.*" | sort | uniq'.format(camus_destination_path, topic, interval)).split() - # return a list of lists of time buckets, e.g. [['2013']] - # return [directory.split('/')[-depth:] for directory in directories] - # return [(int(d) for d in directory.split('/')[-depth:]) for directory in directories] - -def call(command, check_return_code=True): - logger.debug('Running: {0}'.format(' '.join(command))) - p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - if check_return_code and p.returncode != 0: - raise RuntimeError("Command: {0} failed with error code: {1}".format(" ".join(command), p.returncode), - stdout, stderr) - return stdout.strip() - -def sh(command): - logger.debug('Running: %s' % (command)) - return os.popen(command).read().strip() - - -def diff_datewise(left, right, left_format=None, right_format=None): - """ - Parameters - left : a list of datetime strings or objects - right : a list of datetime strings or objects - left_format : None if left contains datetimes, or strptime format - right_format : None if right contains datetimes, or strptime format - - Returns - A tuple of two sets: - [0] : the datetime objects in left but not right - [1] : the datetime objects in right but not left - """ - - if left_format: - left_set = set([datetime.strptime(l, left_format) for l in left]) - else: - left_set = set(left) - - if right_format: - right_set = set([datetime.strptime(r, right_format) for r in right]) - else: - right_set = set(right) - - return (left_set - right_set, right_set - left_set) - - -class HiveUtils(object): - def __init__(self, database='default', options=''): - self.database = database - self.options = options.split() - self.hivecmd = None - # list of partitions for table, keyed by table - # self.table_partitions = {} - self.hivecmd = ['hive'] + self.options + ['cli', '--database', self.database] - # initialize tables dict with table names - self.tables = {} - # cache results for later - for t in self.tables_get(): - self.tables[t] = {} - - def partitions(self, table): - """Returns a list of partitions for the given Hive table.""" - - # cache results for later - # if we don't know the partitions yet, get them now - if not 'partitions' in self.tables[table].keys(): - stdout = self.query("SHOW PARTITIONS %s;" % table) - partitions = stdout.split('\n') - partitions.remove('partition') - self.tables[table]['partitions'] = partitions - - return self.tables[table]['partitions'] - - def table_exists(self, table): # ,force=False - return table in self.tables.keys() - - def tables_get(self): - t = self.query('SHOW TABLES').split('\n') - t.remove('tab_name') - return t - - def partition_interval(self, table): - intervals = { - 4: 'hourly', - 3: 'daily', - 2: 'monhtly', - 1: 'yearly', - } - - # cache results for later - if not 'interval' in self.tables[table].keys(): - # counts the number of partition keys this table has - # and returns a string time interval based in this number - cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table + ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc -l' - logger.debug('Running: %s' % cmd) - # using check_output directly here so that we can pass shell=True and use pipes. - partition_depth = int(subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True).strip()) - self.tables[table]['depth'] = partition_depth - self.tables[table]['interval'] = intervals[partition_depth] - - return self.tables[table]['interval'] - - def query(self, query, check_return_code=True): - """Runs the given hive query and returns stdout""" - return self.command(['-e', query], check_return_code) - - def script(self, script, check_return_code=True): - """Runs the contents of the given script in hive and returns stdout""" - if not os.path.isfile(script): - raise RuntimeError("Hive script: {0} does not exist.".format(script)) - return self.command( ['-f', script], check_return_code) - - def command(self, args, check_return_code=True): - """Runs the `hive` from the command line, passing in the given args, and - returning stdout. - """ - cmd = self.hivecmd + args - return call(cmd, check_return_code) if __name__ == '__main__': @@ -182,23 +36,23 @@ hive_options = arguments['--hive-options'] hive = HiveUtils(database, hive_options) + camus = CamusUtils(camus_destination_path) - topics = topics(camus_destination_path) + topics = camus.topics() print("Topics: " + ','.join(topics)) for topic in topics: if hive.table_exists(topic): interval = hive.partition_interval(topic) hive_partitions = hive.partitions(topic) - camus_partitions = camus_partitions(topic, camus_destination_path) + camus_partitions = camus.partitions(topic, camus_destination_path) - logger.debug(("Hive Partitions for %s:\n" % topic) + '\n.join(hive_partitions)) + logger.debug(("Hive Partitions for %s:\n" % topic) + '\n'.join(hive_partitions)) logger.debug(("Camus Partitions for %s:\n " % topic) + '\n'.join(camus_partitions)) logger.debug("%s import interval is %s" % (topic, interval)) - + # diff the camus and hive partitions - missing_hive, missing_camus = diff_datewise(camus_partitions, hive_partitions, + missing_hive, missing_camus = diffDatewise(camus_partitions, hive_partitions, interval_hierarchies[interval]['directory_format'], interval_hierarchies[interval]['hive_partition_format']) print("Need to create partition for:") pp(missing_hive) - diff --git a/kraken-etl/pagecounts/import-one-hour.sh b/kraken-etl/pagecounts/import-one-hour.sh new file mode 100644 index 0000000..ff8fd1d --- /dev/null +++ b/kraken-etl/pagecounts/import-one-hour.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# +# This script does the following: +# 0. reads four arguments from the CLI, in order, as YEAR, MONTH, DAY, HOUR +# 1. downloads the specified hour worth of data from http://dumps.wikimedia.org/other/pagecounts-raw/ +# 2. extracts the data into hdfs +# 3. creates a partition on a hive table pointing to this data +# + +print_help() { + cat <<EOF + +USAGE: import_one_hour.sh YEAR MONTH DAY HOUR + + Imports one hour worth of data from dumps.wikimedia.org/other/pagecounts-raw into a Hive table + +EOF +} + +if [ $# -ne 4 ] +then + print_help + exit +fi + +YEAR=$1 +MONTH=$2 +DAY=$3 +HOUR=$4 + +# Someone intelligent should set these +LOCAL_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz +HDFS_DIR=/user/milimetric/pagecounts/$YEAR.$MONTH.${DAY}_${HOUR}.00.00 +HDFS_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000 +TABLE=milimetric_pagecounts + +rm $LOCAL_FILE +wget http://dumps.wikimedia.org/other/pagecounts-raw/$YEAR/$YEAR-$MONTH/pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz +hdfs dfs -rm -r $HDFS_DIR +gunzip -c $LOCAL_FILE | hdfs dfs -put - $HDFS_DIR/$HDFS_FILE +rm $LOCAL_FILE +hive -e "ALTER TABLE ${TABLE} DROP PARTITION (year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR');" +hive -e "ALTER TABLE ${TABLE} ADD PARTITION (year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR') location '$HDFS_DIR';" diff --git a/kraken-etl/pagecounts/import.py b/kraken-etl/pagecounts/import.py new file mode 100644 index 0000000..6c249a9 --- /dev/null +++ b/kraken-etl/pagecounts/import.py @@ -0,0 +1,38 @@ +from datetime import datetime, timedelta +from util import ( + HdfsFileCollection, diffDatewise, timestampsToNow, shell +) + +target = CamusUtils('hdfs://wmf/') +topic = 'pagecounts' +hdfsFormat = target.basedir(topic) + '%Y/%m/%d/%H' + +imported = target.partitions(topic) or [datetime.today().strftime(hdfsFormat)] +firstHour = datetime.strptime(min(imported), hdfsFormat) +available = timestampsToNow(firstHour, timedelta(hours=1)) + +hoursMissing = diffDatewise( + available, + imported, + rightParse=hdfsFormat, +) + +for missing in hoursMissing: + print('*************************') + print('Importing {0} '.format(missing)) + + try: + result, err = shell([ + 'import-one-hour', + missing.year, + missing.month, + missing.day, + missing.hour, + ]) + if err: + print(err) + except Exception as e: + print(e) + + print('Done Importing {0} '.format(missing)) + print('*************************') diff --git a/kraken-etl/test.py b/kraken-etl/test.py new file mode 100644 index 0000000..69a7bf1 --- /dev/null +++ b/kraken-etl/test.py @@ -0,0 +1,46 @@ +from datetime import datetime, timedelta +from util import diffDatewise, timestampsToNow +from unittest import TestCase + + +class TestUtil(TestCase): + def testDiffDatewise(self): + l = [] + lJustDates = [] + r = [] + lp = 'blah%Y...%m...%d...%Hblahblah' + rp = 'neenee%Y%m%d%Hneenee' + + expect0 = set([datetime(2012, 6, 14, 13), datetime(2012, 11, 9, 3)]) + expect1 = set([datetime(2012, 6, 14, 14), datetime(2013, 11, 10, 22)]) + + for y in range(2012, 2014): + for m in range(1, 13): + # we're just diffing so we don't care about getting all days + for d in range(1, 28): + for h in range(0, 24): + x = datetime(y, m, d, h) + if not x in expect1: + l.append(datetime.strftime(x, lp)) + lJustDates.append(x) + if not x in expect0: + r.append(datetime.strftime(x, rp)) + + result = diffDatewise(l, r, leftParse=lp, rightParse=rp) + self.assertEqual(result[0], expect0) + self.assertEqual(result[1], expect1) + + result = diffDatewise(lJustDates, r, rightParse=rp) + self.assertEqual(result[0], expect0) + self.assertEqual(result[1], expect1) + + def testTimestampsToNow(self): + now = datetime.now() + start = now - timedelta(hours=2) + expect = [ + start, + start + timedelta(hours=1), + start + timedelta(hours=2), + ] + timestamps = timestampsToNow(start, timedelta(hours=1)) + self.assertEqual(expect, list(timestamps)) diff --git a/kraken-etl/util.py b/kraken-etl/util.py new file mode 100644 index 0000000..586de9d --- /dev/null +++ b/kraken-etl/util.py @@ -0,0 +1,219 @@ +import logger +from datetime import datetime + + +logger = logging.getLogger('kraken-etl-util') +interval_hierarchies = { + 'hourly': { + 'depth': 4, + 'directory_format': '%Y/%m/%d/%H', + 'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' + }, + 'daily': { + 'depth': 3, + 'directory_format': '%Y/%m/%d', + 'hive_partition_format': 'year=%Y/month=%m/day=%d' + }, + 'monthly': { + 'depth': 2, + 'directory_format': '%Y/%m' + 'hive_partition_format': 'year=%Y/month=%m' + }, + 'yearly': { + 'depth': 1, + 'directory_format': '%Y' + 'hive_partition_format': 'year=%Y' + }, +} + + +def diffDatewise(left, right, leftParse=None, rightParse=None): + """ + Parameters + left : a list of datetime strings or objects + right : a list of datetime strings or objects + leftParse : None if left contains datetimes, or strptime format + rightParse : None if right contains datetimes, or strptime format + + Returns + A tuple of two sets: + [0] : the datetime objects in left but not right + [1] : the datetime objects in right but not left + """ + + if leftParse: + leftSet = set([ + datetime.strptime(l.strip(), leftParse) + for l in left if len(l.strip()) + ]) + else: + leftSet = set(left) + + if rightParse: + rightSet = set([ + datetime.strptime(r.strip(), rightParse) + for r in right if len(r.strip()) + ]) + else: + rightSet = set(right) + + return (leftSet - rightSet, rightSet - leftSet) + + +def timestampsToNow(start, increment): + """ + Generates timestamps from @start to datetime.now(), by @increment + + Parameters + start : the first generated timestamp + increment : the timedelta between the generated timestamps + + Returns + A generator that goes from @start to datetime.now() - x, + where x <= @increment + """ + now = datetime.now() + while start < now: + yield start + start += increment + + +def sh(command): + """ + Execute a shell command and return the result + """ + logger.debug('Running: {0}'.format(command)) + return os.popen(command).read().strip() + + +def call(command, check_return_code=True): + """ + Execute a shell command and return the result + """ + logger.debug('Running: {0}'.format(' '.join(command))) + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if check_return_code and p.returncode != 0: + raise RuntimeError("Command: {0} failed with error code: {1}".format(" ".join(command), p.returncode), + stdout, stderr) + return stdout.strip() + + +class HiveUtils(object): + def __init__(self, database='default', options=''): + self.database = database + self.options = options.split() + self.hivecmd = None + # list of partitions for table, keyed by table + # self.table_partitions = {} + self.hivecmd = ['hive'] + self.options + ['cli', '--database', self.database] + # initialize tables dict with table names + self.tables = {} + # cache results for later + for t in self.tables_get(): + self.tables[t] = {} + + def partitions(self, table): + """Returns a list of partitions for the given Hive table.""" + + # cache results for later + # if we don't know the partitions yet, get them now + if not 'partitions' in self.tables[table].keys(): + stdout = self.query("SHOW PARTITIONS %s;" % table) + partitions = stdout.split('\n') + partitions.remove('partition') + self.tables[table]['partitions'] = partitions + + return self.tables[table]['partitions'] + + def table_exists(self, table): # ,force=False + return table in self.tables.keys() + + def tables_get(self): + t = self.query('SHOW TABLES').split('\n') + t.remove('tab_name') + return t + + def partition_interval(self, table): + intervals = { + 4: 'hourly', + 3: 'daily', + 2: 'monhtly', + 1: 'yearly', + } + + # cache results for later + if not 'interval' in self.tables[table].keys(): + # counts the number of partition keys this table has + # and returns a string time interval based in this number + cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table + ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc -l' + logger.debug('Running: %s' % cmd) + # using check_output directly here so that we can pass shell=True and use pipes. + partition_depth = int(subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True).strip()) + self.tables[table]['depth'] = partition_depth + self.tables[table]['interval'] = intervals[partition_depth] + + return self.tables[table]['interval'] + + def query(self, query, check_return_code=True): + """Runs the given hive query and returns stdout""" + return self.command(['-e', query], check_return_code) + + def script(self, script, check_return_code=True): + """Runs the contents of the given script in hive and returns stdout""" + if not os.path.isfile(script): + raise RuntimeError("Hive script: {0} does not exist.".format(script)) + return self.command( ['-f', script], check_return_code) + + def command(self, args, check_return_code=True): + """Runs the `hive` from the command line, passing in the given args, and + returning stdout. + """ + cmd = self.hivecmd + args + return call(cmd, check_return_code) + + +class CamusUtils(object): + """ + Deal with topics and partitions created by Camus + """ + def __init__(self, destination_path): + self.destination_path = destination_path + + def topics(self): + """Reads topic names out of destination_path""" + return sh( + 'hdfs dfs -ls {0}/ | '\ + 'grep -v "Found .* items" | '\ + 'awk -F "/" \'{{print $NF}}\''.format( + self.destination_path + ) + ).split('\n') + + def basedir(self, topic, interval='hourly'): + return self.destination_path + '/' + topic + '/' + interval + + # TODO configure interval partition paths automatically instead of hardcoding + def partitions(self, topic, interval='hourly'): + """ + Returns a list of time bucketd 'partitions' created by Camus import. + These are inferred from directories in hdfs. + """ + + basedir = self.basedir(topic, interval) + + depth = interval_hierarchies[interval]['depth'] + # number of wildcard time bucket directories, e.g. depth_stars == 4 -> '/*/*/*/*' + depth_stars = '/*' * depth + directories = sh( + 'hdfs dfs -ls -d {0}{1} | '\ + 'grep -v \'Found .* items\' | '\ + 'awk \'{{print $NF}}\''.format(basedir, depth_stars) + ).split('\n') + # return list of time bucket directories, e.g. ['2013/10/09/15', '2013/10/09/16', ... ] + return [directory.replace(basedir + '/', '') for directory in directories] + + # return sh('hdfs dfs -ls -R {0}/{1}/{2} | sed "s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, month=\2, day=\3, hour=\4@" | grep "year.*" | sort | uniq'.format(self.destination_path, topic, interval)).split() + # return a list of lists of time buckets, e.g. [['2013']] + # return [directory.split('/')[-depth:] for directory in directories] + # return [(int(d) for d in directory.split('/')[-depth:]) for directory in directories] -- To view, visit https://gerrit.wikimedia.org/r/89327 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: If84fcee97403f91218dc5fade5a86dbdb7daab69 Gerrit-PatchSet: 1 Gerrit-Project: analytics/kraken Gerrit-Branch: master Gerrit-Owner: Milimetric <dandree...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits