Ottomata has submitted this change and it was merged. Change subject: First draft of refinement phase for webrequest ......................................................................
First draft of refinement phase for webrequest This inserts data into a clustered table stored as Parquet, and tags each request with the IsPageviewUDF. Change-Id: Ibd5b120ed96080ca99e5a61c48918c95fee7ae06 --- A hive/webrequest/create_webrequest_refined_table.hql A oozie/webrequest/refine/README.md A oozie/webrequest/refine/bundle.properties A oozie/webrequest/refine/bundle.xml A oozie/webrequest/refine/coordinator.xml A oozie/webrequest/refine/refine_webrequest.hql A oozie/webrequest/refine/workflow.xml 7 files changed, 496 insertions(+), 0 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/hive/webrequest/create_webrequest_refined_table.hql b/hive/webrequest/create_webrequest_refined_table.hql new file mode 100644 index 0000000..dbffc7f --- /dev/null +++ b/hive/webrequest/create_webrequest_refined_table.hql @@ -0,0 +1,54 @@ +-- Creates table statement for refined webrequest table. +-- +-- NOTE: When choosing partition field types, +-- one should take into consideration Hive's +-- insistence on storing partition values +-- as strings. See: +-- https://wikitech.wikimedia.org/wiki/File:Hive_partition_formats.png +-- and +-- http://bots.wmflabs.org/~wm-bot/logs/%23wikimedia-analytics/20140721.txt +-- +-- Parameters: +-- <none> +-- +-- Usage +-- hive -f create_webrequest_refinfed_table.hql --database wmf +-- +-- + +CREATE TABLE IF NOT EXISTS `webrequest`( + `hostname` string COMMENT 'Cache node hostname that served this request', + `sequence` bigint COMMENT 'Sequence number of request on source cache instance', + `dt` string COMMENT 'YYYY-MM-DDTHH:mm:ssZ timestamp', + `time_firstbyte` double COMMENT 'time until the first byte was served', + `ip` string, + `cache_status` string, + `http_status` string, + `response_size` bigint COMMENT 'Response size in bytes', + `http_method` string COMMENT 'Request HTTP method', + `uri_host` string, + `uri_path` string, + `uri_query` string, + `content_type` string COMMENT 'ContentType of response', + `referer` string, + `x_forwarded_for` string COMMENT 'X-Forwarded-For header', + `user_agent` string, + `accept_language` string COMMENT 'AcceptLanguage header', + `x_analytics` string COMMENT 'X-Analytics header', + `range` string COMMENT 'Range field for multipart files', + `is_pageview` boolean COMMENT 'Indicates if this record was marked as a pageview during refinement' +) +PARTITIONED BY ( + `webrequest_source` string COMMENT 'Source cluster', + `year` int COMMENT 'Unpadded year of request', + `month` int COMMENT 'Unpadded month of request', + `day` int COMMENT 'Unpadded day of request', + `hour` int COMMENT 'Unpadded hour of request') +CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS +ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' +STORED AS + INPUTFORMAT + 'parquet.hive.DeprecatedParquetInputFormat' + OUTPUTFORMAT + 'parquet.hive.DeprecatedParquetOutputFormat' +; diff --git a/oozie/webrequest/refine/README.md b/oozie/webrequest/refine/README.md new file mode 100644 index 0000000..f7c4eea --- /dev/null +++ b/oozie/webrequest/refine/README.md @@ -0,0 +1,21 @@ +# Refine phase for webrequest logs + +This job is responsible for the refine (ETL?) phase of +webrequest logs. It currently converts the raw JSON +logs imported from Kafka into a clustered-bucketed table +stored in Parquet format. + +# Outline + +* ```bundle.properties``` can be used to inject the whole refine + pipeline into oozie. +* ```bundle.xml``` injects separate coordinators for each of the + webrequest_sources. +* ```coordinator.xml``` injects a workflow for each dataset. +* ```workflow.xml``` + * Runs a hive query to convert from JSON into the refined data. + +Note that this job uses the checked dataset. If a raw webrequest import +does not have the _SUCCESS done-flag in the directory, the data for that +hour will not be refined until it does. +_ diff --git a/oozie/webrequest/refine/bundle.properties b/oozie/webrequest/refine/bundle.properties new file mode 100644 index 0000000..6507b06 --- /dev/null +++ b/oozie/webrequest/refine/bundle.properties @@ -0,0 +1,60 @@ +# Configures a bundle to manage automatically refining partitions of a Hive +# webrequest table. Any of the following properties are overidable with -D. +# Usage: +# oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config oozie/webrequest/refine/bundle.properties +# +# NOTE: The $oozie_directory must be synced to HDFS so that all relevant +# .xml files exist there when this job is submitted. + + +name_node = hdfs://analytics-hadoop +job_tracker = resourcemanager.analytics.eqiad.wmnet:8032 +queue_name = default + +user = hdfs + +# Base path in HDFS to refinery. +# When submitting this job for production, you should +# override this to point directly at a deployed +# directory name, and not the 'symbolic' 'current' directory. +# E.g. /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07 +refinery_directory = ${name_node}/wmf/refinery/current + +# HDFS path to artifacts that will be used by this job. +# E.g. refinery-hive.jar should exist here. +artifacts_directory = ${refinery_directory}/artifacts + +# Base path in HDFS to oozie files. +# Other files will be used relative to this path. +oozie_directory = ${refinery_directory}/oozie + +# HDFS path to coordinator to run for each webrequest_source. +coordinator_file = ${oozie_directory}/webrequest/refine/coordinator.xml + +# HDFS path to workflow to run. +workflow_file = ${oozie_directory}/webrequest/refine/workflow.xml + +# HDFS path to webrequest dataset definition +datasets_file = ${oozie_directory}/webrequest/datasets.xml + +# Initial import time of the webrequest dataset. +start_time = 2015-01-01T00:00Z + +# Time to stop running this coordinator. Year 3000 == never! +stop_time = 3000-01-01T00:00Z + +# HDFS path to hive-site.xml file. This is needed to run hive actions. +hive_site_xml = ${oozie_directory}/util/hive/hive-site.xml + +# Fully qualified Hive table name. +source_table = wmf_raw.webrequest +destination_table = wmf.webrequest + + +# HDFS path to directory where webrequest data is time bucketed. +webrequest_data_directory = ${name_node}/wmf/data/raw/webrequest + +# Coordintator to start. +oozie.bundle.application.path = ${oozie_directory}/webrequest/refine/bundle.xml +oozie.use.system.libpath = true +oozie.action.external.stats.write = true diff --git a/oozie/webrequest/refine/bundle.xml b/oozie/webrequest/refine/bundle.xml new file mode 100644 index 0000000..47b74aa --- /dev/null +++ b/oozie/webrequest/refine/bundle.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<bundle-app xmlns="uri:oozie:bundle:0.2" + name="refine-${source_table}->${destination_table}-bundle"> + + <parameters> + <property> + <name>queue_name</name> + <value>default</value> + </property> + + <!-- Required properties. --> + <property><name>coordinator_file</name></property> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + <property><name>workflow_file</name></property> + <property><name>start_time</name></property> + <property><name>stop_time</name></property> + <property><name>webrequest_data_directory</name></property> + + <property><name>hive_site_xml</name></property> + <property><name>artifacts_directory</name></property> + <property><name>source_table</name></property> + <property><name>destination_table</name></property> + </parameters> + + <!-- + For now, don't refine bits or upload. + + <coordinator name='refine-webrequest-bits'> + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>webrequest_source</name> + <value>bits</value> + </property> + </configuration> + </coordinator> + --> + + <coordinator name='refine-webrequest-mobile'> + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>webrequest_source</name> + <value>mobile</value> + </property> + </configuration> + </coordinator> + + <coordinator name='refine-webrequest-text'> + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>webrequest_source</name> + <value>text</value> + </property> + </configuration> + </coordinator> + + <!-- + <coordinator name='refine-webrequest-upload'> + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>webrequest_source</name> + <value>upload</value> + </property> + </configuration> + </coordinator> + --> +</bundle-app> diff --git a/oozie/webrequest/refine/coordinator.xml b/oozie/webrequest/refine/coordinator.xml new file mode 100644 index 0000000..ef44a36 --- /dev/null +++ b/oozie/webrequest/refine/coordinator.xml @@ -0,0 +1,123 @@ +<?xml version="1.0" encoding="UTF-8"?> +<coordinator-app xmlns="uri:oozie:coordinator:0.4" + name="refine-${source_table}->${destination_table}-${webrequest_source}-coord" + frequency="${coord:hours(1)}" + start="${start_time}" + end="${stop_time}" + timezone="Universal"> + + <parameters> + <property> + <name>queue_name</name> + <value>default</value> + </property> + + <!-- Required properties. --> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + <property><name>workflow_file</name></property> + <property><name>start_time</name></property> + <property><name>stop_time</name></property> + <property><name>webrequest_data_directory</name></property> + + <property><name>datasets_file</name></property> + <property><name>hive_site_xml</name></property> + <property><name>artifacts_directory</name></property> + <property><name>source_table</name></property> + <property><name>destination_table</name></property> + <property><name>webrequest_source</name></property> + </parameters> + + <controls> + <!-- + Refining is not too cheap, so we limit + concurrency. + + Note, that this is per coordinator. So if we run this + coordinator for say 4 webrequest_sources (see bundle.xml :-)), + we effectively compute sequence statistics for up to 8 + datasets in parallel. + + Also note, that back-filling is not limited by the + coordinator's frequency, so back-filling works nicely + even-though the concurrency is low. + --> + <concurrency>2</concurrency> + + + <!-- + Since we expect only one incarnation per hourly dataset, the + default throttle of 12 is way to high, and there is not need + to keep that many materialized jobs around. + + By resorting to 2, we keep the hdfs checks on the datasets + low, while still being able to easily feed the concurrency. + --> + <throttle>2</throttle> + </controls> + + <datasets> + <!-- + Include the given datasets_file file. This should + define the "webrequest" dataset for this coordinator. + --> + <include>${datasets_file}</include> + </datasets> + + <input-events> + <data-in name="input" dataset="webrequest_${webrequest_source}"> + <instance>${coord:current(0)}</instance> + </data-in> + </input-events> + + <action> + <workflow> + <app-path>${workflow_file}</app-path> + <configuration> + + <!-- Pass these properties through to the workflow --> + <property><name>name_node</name><value>${name_node}</value></property> + <property><name>job_tracker</name><value>${job_tracker}</value></property> + <property><name>queue_name</name><value>${queue_name}</value></property> + + <property> + <name>hive_site_xml</name> + <value>${hive_site_xml}</value> + </property> + <property> + <name>artifacts_directory</name> + <value>${artifacts_directory}</value> + </property> + <property> + <name>source_table</name> + <value>${source_table}</value> + </property> + <property> + <name>destination_table</name> + <value>${destination_table}</value> + </property> + <property> + <name>webrequest_source</name> + <value>${webrequest_source}</value> + </property> + <property> + <name>year</name> + <value>${coord:formatTime(coord:nominalTime(), "y")}</value> + </property> + <property> + <name>month</name> + <value>${coord:formatTime(coord:nominalTime(), "M")}</value> + </property> + <property> + <name>day</name> + <value>${coord:formatTime(coord:nominalTime(), "d")}</value> + </property> + <property> + <name>hour</name> + <value>${coord:formatTime(coord:nominalTime(), "H")}</value> + </property> + + </configuration> + </workflow> + </action> +</coordinator-app> diff --git a/oozie/webrequest/refine/refine_webrequest.hql b/oozie/webrequest/refine/refine_webrequest.hql new file mode 100644 index 0000000..98dc01e --- /dev/null +++ b/oozie/webrequest/refine/refine_webrequest.hql @@ -0,0 +1,70 @@ +-- Parameters: +-- source_table -- Fully qualified table name to compute the +-- statistics for. +-- destination_table -- Fully qualified table name to stopre the +-- computed statistics in. This table should +-- have schema described in [1]. +-- webrequest_source -- webrequest_source of partition to compute +-- statistics for. +-- year -- year of partition to compute statistics +-- for. +-- month -- month of partition to compute statistics +-- for. +-- day -- day of partition to compute statistics +-- for. +-- hour -- hour of partition to compute statistics +-- for. +-- +-- Usage: +-- hive -f refine_webrequest.hql \ +-- -d source_table=wmf_raw.webrequest \ +-- -d destination_table=wmf_raw.webrequest_sequence_stats \ +-- -d webrequest_source=bits \ +-- -d year=2014 \ +-- -d month=12 \ +-- -d day=30 \ +-- -d hour=1 +-- + +SET parquet.compression = SNAPPY; +SET hive.enforce.bucketing = true; +-- mapreduce.job.reduces should not be necessary to +-- specify since we set hive.enforce.bucketing=true. +-- However, without this set, only one reduce task is +-- launched, so we set it manually. This needs +-- to be the same as the number of buckets the +-- table is clustered by. +SET mapreduce.job.reduces = 64; + +ADD JAR ${artifacts_directory}/refinery-hive.jar; +CREATE TEMPORARY FUNCTION is_pageview as 'org.wikimedia.analytics.refinery.hive.IsPageviewUDF'; + +INSERT OVERWRITE TABLE ${destination_table} + PARTITION(webrequest_source='${webrequest_source}',year=${year},month=${month},day=${day},hour=${hour}) + SELECT + hostname, + sequence, + dt, + time_firstbyte, + ip, + cache_status, + http_status, + response_size, + http_method, + uri_host, + uri_path, + uri_query, + content_type, + referer, + x_forwarded_for, + user_agent, + accept_language, + x_analytics, + range, + is_pageview(uri_host, uri_path, uri_query, http_status, content_type, user_agent) as is_pageview + FROM + ${source_table} + WHERE + webrequest_source='${webrequest_source}' AND + year=${year} AND month=${month} AND day=${day} AND hour=${hour} +; diff --git a/oozie/webrequest/refine/workflow.xml b/oozie/webrequest/refine/workflow.xml new file mode 100644 index 0000000..599ca73 --- /dev/null +++ b/oozie/webrequest/refine/workflow.xml @@ -0,0 +1,97 @@ +<?xml version="1.0" encoding="UTF-8"?> +<workflow-app xmlns="uri:oozie:workflow:0.4" + name="refine-${source_table}->${destination_table}-${webrequest_source},${year},${month},${day},${hour}-wf"> + + <parameters> + <property> + <name>queue_name</name> + <value>default</value> + </property> + + <!-- Required properties --> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + + <property> + <name>hive_script</name> + <!-- This is relative to the containing directory of this file. --> + <value>refine_webrequest.hql</value> + <description>Hive script to run.</description> + </property> + + <property> + <name>hive_site_xml</name> + <description>hive-site.xml file path in HDFS</description> + </property> + <property> + <name>artifacts_directory</name> + <description>Path in HDFS to artifacts. refinery-hive.jar should be here.</description> + </property> + <property> + <name>source_table</name> + <description>Hive table to refine</description> + </property> + <property> + <name>destination_table</name> + <description>The destinaton table to store refined data in.</description> + </property> + <property> + <name>webrequest_source</name> + <description>The partition's webrequest_source</description> + </property> + <property> + <name>year</name> + <description>The partition's year</description> + </property> + <property> + <name>month</name> + <description>The partition's month</description> + </property> + <property> + <name>day</name> + <description>The partition's day</description> + </property> + <property> + <name>hour</name> + <description>The partition's hour</description> + </property> + </parameters> + + <start to="refine"/> + + <action name="refine"> + <hive xmlns="uri:oozie:hive-action:0.2"> + <job-tracker>${job_tracker}</job-tracker> + <name-node>${name_node}</name-node> + <job-xml>${hive_site_xml}</job-xml> + <configuration> + <property> + <name>mapreduce.job.queuename</name> + <value>${queue_name}</value> + </property> + <property> + <name>hive.exec.scratchdir</name> + <value>/tmp/hive-${user}</value> + </property> + </configuration> + + <script>${hive_script}</script> + <param>artifacts_directory=${artifacts_directory}</param> + <param>source_table=${source_table}</param> + <param>destination_table=${destination_table}</param> + <param>webrequest_source=${webrequest_source}</param> + <param>year=${year}</param> + <param>month=${month}</param> + <param>day=${day}</param> + <param>hour=${hour}</param> + </hive> + + <ok to="end"/> + <error to="kill"/> + </action> + + <kill name="kill"> + <message>Webrequest refine action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> -- To view, visit https://gerrit.wikimedia.org/r/182478 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ibd5b120ed96080ca99e5a61c48918c95fee7ae06 Gerrit-PatchSet: 10 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> Gerrit-Reviewer: Nuria <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
