EBernhardson has uploaded a new change for review. https://gerrit.wikimedia.org/r/316931
Change subject: Convert popularity_score into hql script ...................................................................... Convert popularity_score into hql script We were doing this calculation in spark because there was some expectation that we would need to do something more complicated than the simple % of pageviews. We've since decided this is probably fine as is. There is a problem though, for some reason this is generating scores for some pages as high as 0.98, which is plainly incorrect. Rather than figure out what is wrong with the spark part, simplify it into a straight forward hql that does everything we need. Bug: T148136 Change-Id: I7f2ebc5b1a3d0d0edcbd9549c7cbb07abf17b3ae --- M hive/popularity_score/create_popularity_score_table.hql M oozie/popularity_score/README.md M oozie/popularity_score/coordinator.properties M oozie/popularity_score/coordinator.xml D oozie/popularity_score/lib/.gitkeep D oozie/popularity_score/log4j.properties D oozie/popularity_score/popularityScore.py D oozie/popularity_score/popularityScoreTest.py A oozie/popularity_score/popularity_score.hql M oozie/popularity_score/workflow.xml 10 files changed, 93 insertions(+), 319 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/discovery/analytics refs/changes/31/316931/1 diff --git a/hive/popularity_score/create_popularity_score_table.hql b/hive/popularity_score/create_popularity_score_table.hql index 847d14a..0a6a893 100644 --- a/hive/popularity_score/create_popularity_score_table.hql +++ b/hive/popularity_score/create_popularity_score_table.hql @@ -6,7 +6,7 @@ `agg_days` int COMMENT 'Unpadded number of days aggregated over', `year` int COMMENT 'Unpadded year score aggregation starts at', `month` int COMMENT 'Unpadded month score aggregation starts at', - `day` int COMMENT 'Unpadded day score aggregation starts at', + `day` int COMMENT 'Unpadded day score aggregation starts at') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT diff --git a/oozie/popularity_score/README.md b/oozie/popularity_score/README.md index 98cfd25..5879cc8 100644 --- a/oozie/popularity_score/README.md +++ b/oozie/popularity_score/README.md @@ -20,6 +20,3 @@ Note that this job uses the pageview_hourly dataset. If a pageview aggregation job does not have the _SUCCESS done-flag in the directory, the data for that week will not be aggregated until it does. - -@TODO is that desirable? We can probably get reasonable results if an -hour or two from the week is missing diff --git a/oozie/popularity_score/coordinator.properties b/oozie/popularity_score/coordinator.properties index 08baccd..5f781e3 100644 --- a/oozie/popularity_score/coordinator.properties +++ b/oozie/popularity_score/coordinator.properties @@ -32,7 +32,6 @@ # HDFS path to pageview dataset definitions pageview_datasets_file = ${analytics_oozie_directory}/pageview/datasets.xml pageview_data_directory = ${name_node}/wmf/data/wmf/pageview -pageview_hourly_data_directory = ${pageview_data_directory}/hourly pageview_hourly_table = wmf.pageview_hourly # HDFS path to popularity score dataset definitions @@ -52,33 +51,15 @@ # Time to stop running this coordinator. Year 3000 == never! stop_time = 3000-01-01T00:00Z -# HDFS path to workflow to add partition to hive -add_partition_workflow_file = ${analytics_oozie_directory}/util/hive/partition/add/workflow.xml - # HDFS path to workflow to mark a directory as done mark_directory_done_workflow_file = ${analytics_oozie_directory}/util/mark_directory_done/workflow.xml # Workflow to send an error email send_error_email_workflow_file = ${analytics_oozie_directory}/util/send_error_email/workflow.xml +send_error_email_address = discovery-ale...@lists.wikimedia.org # HDFS path to hive-site.xml file. This is needed to run hive actions. hive_site_xml = ${name_node}/user/hive/hive-site.xml - -# Spark assembly jar path needs to be given to spark conf -spark_assembly_jar = ${name_node}/user/spark/share/lib/spark-assembly.jar -# Hive library jars path to spark to use HiveContext -hive_lib_path = /usr/lib/hive/lib/* - -# Spark load settings - Better to set them here, configurable through oozie -# when spark alocates this resource, it's not shareable anymore (even if only -# one worker blocks the whole thing), so we prefer to be small-ish instead -# of big-ish :) with this setiings, job runs in less than 1/2h -spark_number_executors = 16 -spark_executor_memory = 3G -spark_driver_memory = 2G - -# Record version to keep track of changes -record_version = 0.0.1 # Coordintator to start. oozie.coord.application.path = ${coordinator_file} diff --git a/oozie/popularity_score/coordinator.xml b/oozie/popularity_score/coordinator.xml index d22ca38..12fcc35 100644 --- a/oozie/popularity_score/coordinator.xml +++ b/oozie/popularity_score/coordinator.xml @@ -15,7 +15,6 @@ <property><name>pageview_datasets_file</name></property> <property><name>pageview_data_directory</name></property> - <property><name>pageview_hourly_data_directory</name></property> <property><name>pageview_hourly_table</name></property> <property><name>discovery_data_directory</name></property> @@ -30,17 +29,10 @@ <property><name>job_tracker</name></property> <property><name>hive_site_xml</name></property> - <property><name>hive_lib_path</name></property> - - <property><name>spark_number_executors</name></property> - <property><name>spark_executor_memory</name></property> - <property><name>spark_driver_memory</name></property> <property><name>mark_directory_done_workflow_file</name></property> <property><name>send_error_email_workflow_file</name></property> - <property><name>add_partition_workflow_file</name></property> - - <property><name>discovery_oozie_directory</name></property> + <property><name>send_error_email_address</name></property> </parameters> @@ -101,10 +93,6 @@ <app-path>${workflow_file}</app-path> <configuration> - <property> - <name>start_date</name> - <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0 - days_aggregated, "DAY"), "y/M/d")}</value> - </property> <property> <name>year</name> <value>${coord:formatTime(coord:nominalTime(), "y")}</value> diff --git a/oozie/popularity_score/lib/.gitkeep b/oozie/popularity_score/lib/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/oozie/popularity_score/lib/.gitkeep +++ /dev/null diff --git a/oozie/popularity_score/log4j.properties b/oozie/popularity_score/log4j.properties deleted file mode 100644 index 25f03b5..0000000 --- a/oozie/popularity_score/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -# Set everything to be logged to the file. This is not enabled -# in the default workflow. -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.File=hdfs://analytics-hadoop/tmp/popularity-score-spark.log -log4j.appender.file.append=false -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/oozie/popularity_score/popularityScore.py b/oozie/popularity_score/popularityScore.py deleted file mode 100644 index 3292007..0000000 --- a/oozie/popularity_score/popularityScore.py +++ /dev/null @@ -1,126 +0,0 @@ -import pyspark -import pyspark.sql -import pyspark.sql.functions -import pyspark.sql.types -import argparse -import datetime -import subprocess - -# Developed for spark 1.3.0 -# Example testing use: -# spark-submit --master yarn --deploy-mode client --driver-memory 2g \ -# --num-executors 48 --executor-cores 2 --executor-memory 4g \ -# popularityScore.py 2015/11/11 2015/11/18 - - -def parse_date(s): - try: - return datetime.datetime.strptime(s, "%Y/%m/%d") - except ValueError: - msg = "Not a valid date, must be in format YYYY/MM/DD: '{s}'" % {s: s} - raise argparse.ArgumentTypeError(msg) - -parser = argparse.ArgumentParser() -parser.add_argument('--source-dir', dest='source_dir', - default='hdfs://analytics-hadoop/wmf/data/wmf/pageview/hourly', - help='Directory containing the wmf.pageview_hourly hive table') -parser.add_argument('--output-dir', dest='output_dir', - default='hdfs://analytics-hadoop/wmf/data/wmf/discovery/popularity_score', - help='Directory to write results out to including all partition directories ') -parser.add_argument('start_date', type=parse_date, - help='Inclusive date to start aggregation from') -parser.add_argument('end_date', type=parse_date, - help='Exclusive date to end aggregation on') - - -def pathForPageViewHourly(source, date): - """Transform a base path + date into directory name - partitioned by year, month, day, hour""" - return "%(source)s/year=%(year)s/month=%(month)s/day=%(day)s/hour=%(hour)s" % { - "source": source, - "year": date.year, - "month": date.month, - "day": date.day, - "hour": date.hour, - } - - -def pageViewHourlyPathList(source, startDate, endDate): - """Transform a base path plus start and end dates into a - list of directories to read""" - curDate = startDate - delta = datetime.timedelta(hours=1) - paths = [] - while curDate < endDate: - paths.append(pathForPageViewHourly(source, curDate)) - curDate = curDate + delta - return paths - - -def deleteHdfsDir(path): - """Deletes a directory from HDFS. PySpark doesn't ship with any - hdfs integration so this shells out""" - subprocess.call(["hdfs", "dfs", "-rm", "-r", "-f", path]) - - -def calcProjectPageViews(dataFrame): - """Generates a dict from project name to the number of page - views in the dataFrame for that project. For WMF this should - generate a dict with less than 1000 entries""" - data = dataFrame.groupBy(dataFrame.project).agg( - dataFrame.project, - pyspark.sql.functions.sum(dataFrame.view_count).alias("view_count"), - ).collect() - - return {item.project: item.view_count for item in data} - - -def calcPopularityScore(sc, source): - filtered = source.filter( - source.page_id.isNotNull() - ) - - aggregated = filtered.groupBy( - source.page_id, - source.project, - ).agg( - source.project, - source.page_id, - pyspark.sql.functions.sum(source.view_count).alias("view_count"), - ) - - projectPageViews = sc.broadcast(calcProjectPageViews(filtered)) - # This is a very naive version of the popularity score, likely it will be extended over - # time to be more robust. For the initial iterations this should be sufficient though. - popularityScore = pyspark.sql.functions.udf( - lambda view_count, project: view_count / float(projectPageViews.value[project]), - pyspark.sql.types.DoubleType(), - ) - - print("Calculating popularity score") - return aggregated.select( - aggregated.project, - aggregated.page_id, - popularityScore( - aggregated.view_count, - aggregated.project - ).alias('score'), - ) - -if __name__ == "__main__": - args = parser.parse_args() - sc = pyspark.SparkContext(appName="Discovery Popularity Score") - sqlContext = pyspark.sql.SQLContext(sc) - - parquetPaths = pageViewHourlyPathList(args.source_dir, args.start_date, args.end_date) - print("loading pageview data from:") - print("\t" + "\n\t".join(parquetPaths) + "\n") - dataFrame = sqlContext.parquetFile(*parquetPaths) - result = calcPopularityScore(sc, dataFrame) - - deleteHdfsDir(args.output_dir) - # the default spark.sql.shuffle.partitions creates 200 partitions, resulting in 3mb files. - # repartition to achieve result files close to 256mb (our default hdfs block size) - print("Writing results to " + args.output_dir) - # In pyspark 1.4 this can be updated to use coalesce which is more performant - result.repartition(16).saveAsParquetFile(args.output_dir) diff --git a/oozie/popularity_score/popularityScoreTest.py b/oozie/popularity_score/popularityScoreTest.py deleted file mode 100644 index 3bc11ff..0000000 --- a/oozie/popularity_score/popularityScoreTest.py +++ /dev/null @@ -1,48 +0,0 @@ -import unittest -import json -import pyspark -import pyspark.sql - -from popularityScore import calcPopularityScore - - -class PopularityScoreTest(unittest.TestCase): - def setUp(self): - self.sc = pyspark.SparkContext() - self.sql = pyspark.sql.SQLContext(self.sc) - - def createDataFrame(self, fixture): - encoded = [json.dumps(item) for item in fixture] - return self.sql.jsonRDD(self.sc.parallelize(encoded)) - - def test_foo(self): - fixture = self.createDataFrame([ - { - 'project': 'en.wikipedia', - 'page_id': 12345, - 'view_count': 1, - }, - { - 'project': 'en.wikipedia', - 'page_id': 12345, - 'view_count': 5, - - }, - { - 'project': 'en.wikipedia', - 'page_id': None, - 'view_count': 5, - }, - ]) - - result = [row.asDict() for row in calcPopularityScore(self.sc, fixture).collect()] - self.assertEqual(result, [ - { - u'project': 'en.wikipedia', - u'page_id': 12345, - u'score': 1.0, - } - ]) - -if __name__ == '__main__': - unittest.main() diff --git a/oozie/popularity_score/popularity_score.hql b/oozie/popularity_score/popularity_score.hql new file mode 100644 index 0000000..18af7e6 --- /dev/null +++ b/oozie/popularity_score/popularity_score.hql @@ -0,0 +1,57 @@ +-- Parameters: +-- source_table -- Fully qualified table name to compute the +-- aggregation for +-- destination_table -- Fully qualified table name to fill in aggregated +-- values +-- start_date -- date of partition to start aggregation at, in the +-- format yyyy-MM-dd +-- year -- inclusive year to end aggregation on +-- month -- inclusive month to end aggregation on +-- day -- inclusive day to end aggregation on +-- days_aggregated -- number of days to aggregate over +-- +-- Usage: +-- hive -f popularity_score.hql +-- -d source_table=wmf.pageview_hourly +-- -d destination_table=discovery.popularity_score +-- -d year=2016 +-- -d month=10 +-- -d day=3 +-- -d days_aggregated=7 +-- + +SET partquet.compression = SNAPPY; +SET mapred.reduce.tasks = 6; + +SET hivevar:end_date = TO_DATE(CONCAT_WS('-', CAST(${year} AS string), CAST(${month} AS string), CAST(${day} AS string))); +SET hivevar:start_date = DATE_SUB(${end_date}, ${days_aggregated}); +SET hivevar:row_date = TO_DATE(CONCAT_WS('-', CAST(year AS string), CAST(month AS string), CAST(day AS string))); + +INSERT OVERWRITE TABLE ${destination_table} + PARTITION(agg_days=${days_aggregated},year=${year},month=${month},day=${day}) + SELECT + hourly.project, + hourly.page_id, + SUM(hourly.view_count) / agg.view_count AS score + FROM + ${source_table} hourly + JOIN ( + SELECT + project, + SUM(view_count) AS view_count + FROM + ${source_table} + WHERE + page_id IS NOT NULL + AND ${row_date} BETWEEN ${start_date} AND ${end_date} + GROUP BY + project + ) agg ON hourly.project = agg.project + WHERE + hourly.page_id IS NOT NULL + AND ${row_date} BETWEEN ${start_date} AND ${end_date} + GROUP BY + hourly.project, + hourly.page_id, + agg.view_count +; diff --git a/oozie/popularity_score/workflow.xml b/oozie/popularity_score/workflow.xml index 65e321d..d18cb3a 100644 --- a/oozie/popularity_score/workflow.xml +++ b/oozie/popularity_score/workflow.xml @@ -1,9 +1,8 @@ <?xml version="1.0" encoding="UTF-8"?> <workflow-app xmlns="uri:oozie:workflow:0.4" - name="discovery-popularity-score-${pageview_hourly_table}->${popularity_score_table}-${year},${month},${day}-wf"> + name="discovery-popularity-score-${pageview_hourly_table}->${popularity_score_table}-${days_aggregated},${year}-${month}-${day}-wf"> <parameters> - <!-- Default values for inner oozie settings --> <property> <name>oozie_launcher_queue_name</name> @@ -19,107 +18,69 @@ <property><name>name_node</name></property> <property><name>job_tracker</name></property> + <!-- Aggregation related configuration properties --> + <property> + <name>hive_popularity_score_script_aggregate</name> + <!-- This is relative to the containing directory of this file. --> + <value>popularity_score.hql</value> + <description>Hive script to run.</description> + </property> + <property> <name>hive_site_xml</name> <description>hive-site.xml file path in HDFS</description> </property> <property> - <name>hive_lib_path</name> - <description>Local path to hive jars on executor instances</description> - </property> - - <property> - <name>spark_assembly_jar</name> - <description>HDFS path to spark-assembly.jar</description> - </property> - <property> - <name>spark_number_executors</name> - <description>Number of executors to run job with</description> - </property> - <property> - <name>spark_executor_memory</name> - <description>Amount of memory to reserve on executor instances</description> - </property> - <property> - <name>spark_driver_memory</name> - <description>Amount of memory to reserve on driver instance</description> - </property> - - <property> <name>pageview_hourly_table</name> <description>Fully qualified name of the pageview_hourly table in hive</description> </property> <property> - <name>pageview_hourly_data_directory</name> - <description>Directory which contains the data for hive wmf.pageview_hourly table</description> - </property> - <property> <name>popularity_score_table</name> <description>Fully qualified name of the popularity_score table in hive</description> - </property> - <property> - <name>popularity_score_data_directory</name> - <description>The popularity_score directory path in HDFS</description> - </property> - <property> - <name>popularity_score_partition_directory</name> - <value>${popularity_score_data_directory}/agg_days=${days_aggregated}/year=${year}/month=${month}/day=${day}</value> </property> <property> <name>days_aggregated</name> <description>Number of days to aggregate page views over</description> </property> - - <property> - <name>start_date</name> - <description>The inclusive date to start aggregation on in YYYY/MM/DD format</description> - </property> <property> <name>year</name> - <description>Exlusive four digit year portion of date to end aggregation on</description> + <description>The inclusive year to end aggregation</description> </property> <property> <name>month</name> - <description>Exclusive unpadded month portion of date to end aggregation on</description> + <description>The inclusive month to end aggregation</description> </property> <property> <name>day</name> - <description>Exclusive unpadded day portion of date to end aggregation on</description> + <description>The inclusive day to end aggregation</description> </property> - <property> - <name>add_partition_workflow_file</name> - <description>Workflow for adding a partition to hive</description> - </property> <property> <name>mark_directory_done_workflow_file</name> <description>Workflow for marking a directory as complete</description> </property> <property> + <name>popularity_score_dataset_directory</name> + <description>Popularity score directory to generate the done flag in</description> + </property> + <property> <name>send_error_email_workflow_file</name> <description>Workflow for sending an email</description> </property> - <property> - <name>discovery_oozie_directory</name> - <description>The path to this repositories oozie directory in HDFS</description> - </property> - - <property> - <name>pyspark_popularity_score_script</name> - <!-- This is relative to the containing directory of this file. --> - <value>${discovery_oozie_directory}/popularity_score/popularityScore.py</value> - <description>Path to Pyspark script to run for generating the popularity score in HDFS.</description> + <name>send_error_email_address</name> + <description>Email address to send error emails to</description> </property> </parameters> <start to="aggregate"/> <action name="aggregate"> - <spark xmlns="uri:oozie:spark-action:0.1"> + <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${job_tracker}</job-tracker> <name-node>${name_node}</name-node> + <job-xml>${hive_site_xml}</job-xml> <configuration> <property> <name>mapreduce.job.queuename</name> @@ -130,51 +91,23 @@ <value>${oozie_launcher_queue_name}</value> </property> <property> - <name>oozie.launcher.mapreduce.map.memory.mb</name> + <name>oozie.launcher.mapred.map.memory.mb</name> <value>${oozie_launcher_memory}</value> </property> <property> - <name>oozie.launcher.mapreduce.map.env</name> - <value>SPARK_HOME=/usr/lib/spark</value> + <name>hive.exec.scratchdir</name> + <value>/tmp/hive-${user}</value> </property> </configuration> - <master>yarn-cluster</master> - <mode>cluster</mode> - <name>Discovery Popularity Score</name> - <jar>${pyspark_popularity_score_script}</jar> - <spark-opts>--conf spark.yarn.jar=${spark_assembly_jar} --executor-memory ${spark_executor_memory} --driver-memory ${spark_driver_memory} --num-executors ${spark_number_executors} --queue ${queue_name} --driver-class-path ${hive_lib_path} --driver-java-options "-Dspark.executor.extraClassPath=${hive_lib_path}" --files ${hive_site_xml}</spark-opts> - <!-- arguments for pyspark script --> - <arg>--source-dir</arg> - <arg>${pageview_hourly_data_directory}</arg> - <arg>--output-dir</arg> - <arg>${popularity_score_partition_directory}</arg> - <arg>${start_date}</arg> - <arg>${year}/${month}/${day}</arg> - </spark> + <script>${hive_popularity_score_script_aggregate}</script> + <param>source_table=${pageview_hourly_table}</param> + <param>destination_table=${popularity_score_table}</param> + <param>days_aggregated=${days_aggregated}</param> + <param>year=${year}</param> + <param>month=${month}</param> + <param>day=${day}</param> + </hive> - <ok to="add_hive_partition"/> - <error to="send_error_email"/> - </action> - - <action name="add_hive_partition"> - <sub-workflow> - <app-path>${add_partition_workflow_file}</app-path> - <propagate-configuration/> - <configuration> - <property> - <name>table</name> - <value>${popularity_score_table}</value> - </property> - <property> - <name>partition_spec</name> - <value>agg_days=${days_aggregated},year=${year},month=${month},day=${day}</value> - </property> - <property> - <name>location</name> - <value>${popularity_score_partition_directory}</value> - </property> - </configuration> - </sub-workflow> <ok to="mark_popularity_score_dataset_done"/> <error to="send_error_email"/> </action> @@ -185,7 +118,7 @@ <configuration> <property> <name>directory</name> - <value>${popularity_score_partition_directory}</value> + <value>${popularity_score_dataset_directory}</value> </property> </configuration> </sub-workflow> @@ -208,7 +141,7 @@ </property> <property> <name>to</name> - <value>discovery-ale...@lists.wikimedia.org</value> + <value>${send_error_email_address}</value> </property> </configuration> </sub-workflow> -- To view, visit https://gerrit.wikimedia.org/r/316931 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7f2ebc5b1a3d0d0edcbd9549c7cbb07abf17b3ae Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/discovery/analytics Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits