Ottomata has submitted this change and it was merged.

Change subject: First draft of refinement phase for webrequest
......................................................................


First draft of refinement phase for webrequest

This inserts data into a clustered table stored as Parquet,
and tags each request with the IsPageviewUDF.

Change-Id: Ibd5b120ed96080ca99e5a61c48918c95fee7ae06
---
A hive/webrequest/create_webrequest_refined_table.hql
A oozie/webrequest/refine/README.md
A oozie/webrequest/refine/bundle.properties
A oozie/webrequest/refine/bundle.xml
A oozie/webrequest/refine/coordinator.xml
A oozie/webrequest/refine/refine_webrequest.hql
A oozie/webrequest/refine/workflow.xml
7 files changed, 496 insertions(+), 0 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/hive/webrequest/create_webrequest_refined_table.hql 
b/hive/webrequest/create_webrequest_refined_table.hql
new file mode 100644
index 0000000..dbffc7f
--- /dev/null
+++ b/hive/webrequest/create_webrequest_refined_table.hql
@@ -0,0 +1,54 @@
+-- Creates table statement for refined webrequest table.
+--
+-- NOTE:  When choosing partition field types,
+-- one should take into consideration Hive's
+-- insistence on storing partition values
+-- as strings.  See:
+-- https://wikitech.wikimedia.org/wiki/File:Hive_partition_formats.png
+-- and
+-- http://bots.wmflabs.org/~wm-bot/logs/%23wikimedia-analytics/20140721.txt
+--
+-- Parameters:
+--     <none>
+--
+-- Usage
+--     hive -f create_webrequest_refinfed_table.hql --database wmf
+--
+--
+
+CREATE TABLE IF NOT EXISTS `webrequest`(
+    `hostname` string COMMENT 'Cache node hostname that served this request',
+    `sequence` bigint COMMENT 'Sequence number of request on source cache 
instance',
+    `dt` string COMMENT 'YYYY-MM-DDTHH:mm:ssZ timestamp',
+    `time_firstbyte` double COMMENT 'time until the first byte was served',
+    `ip` string,
+    `cache_status` string,
+    `http_status` string,
+    `response_size` bigint COMMENT 'Response size in bytes',
+    `http_method` string COMMENT 'Request HTTP method',
+    `uri_host` string,
+    `uri_path` string,
+    `uri_query` string,
+    `content_type` string COMMENT 'ContentType of response',
+    `referer` string,
+    `x_forwarded_for` string COMMENT 'X-Forwarded-For header',
+    `user_agent` string,
+    `accept_language` string COMMENT 'AcceptLanguage header',
+    `x_analytics` string COMMENT 'X-Analytics header',
+    `range` string COMMENT 'Range field for multipart files',
+    `is_pageview` boolean COMMENT 'Indicates if this record was marked as a 
pageview during refinement'
+)
+PARTITIONED BY (
+    `webrequest_source` string COMMENT 'Source cluster',
+    `year` int COMMENT 'Unpadded year of request',
+    `month` int COMMENT 'Unpadded month of request',
+    `day` int COMMENT 'Unpadded day of request',
+    `hour` int COMMENT 'Unpadded hour of request')
+CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
+ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
+STORED AS
+    INPUTFORMAT
+        'parquet.hive.DeprecatedParquetInputFormat'
+    OUTPUTFORMAT
+        'parquet.hive.DeprecatedParquetOutputFormat'
+;
diff --git a/oozie/webrequest/refine/README.md 
b/oozie/webrequest/refine/README.md
new file mode 100644
index 0000000..f7c4eea
--- /dev/null
+++ b/oozie/webrequest/refine/README.md
@@ -0,0 +1,21 @@
+# Refine phase for webrequest logs
+
+This job is responsible for the refine (ETL?) phase of
+webrequest logs.  It currently converts the raw JSON
+logs imported from Kafka into a clustered-bucketed table
+stored in Parquet format.
+
+# Outline
+
+* ```bundle.properties``` can be used to inject the whole refine
+  pipeline into oozie.
+* ```bundle.xml``` injects separate coordinators for each of the
+  webrequest_sources.
+* ```coordinator.xml``` injects a workflow for each dataset.
+* ```workflow.xml```
+  * Runs a hive query to convert from JSON into the refined data.
+
+Note that this job uses the checked dataset.  If a raw webrequest import
+does not have the _SUCCESS done-flag in the directory, the data for that
+hour will not be refined until it does.
+_
diff --git a/oozie/webrequest/refine/bundle.properties 
b/oozie/webrequest/refine/bundle.properties
new file mode 100644
index 0000000..6507b06
--- /dev/null
+++ b/oozie/webrequest/refine/bundle.properties
@@ -0,0 +1,60 @@
+# Configures a bundle to manage automatically refining partitions of a Hive
+# webrequest table.  Any of the following properties are overidable with -D.
+# Usage:
+#   oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config 
oozie/webrequest/refine/bundle.properties
+#
+# NOTE:  The $oozie_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+user                              = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g.  /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                = ${name_node}/wmf/refinery/current
+
+# HDFS path to artifacts that will be used by this job.
+# E.g. refinery-hive.jar should exist here.
+artifacts_directory               = ${refinery_directory}/artifacts
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                   = ${refinery_directory}/oozie
+
+# HDFS path to coordinator to run for each webrequest_source.
+coordinator_file                  = 
${oozie_directory}/webrequest/refine/coordinator.xml
+
+# HDFS path to workflow to run.
+workflow_file                     = 
${oozie_directory}/webrequest/refine/workflow.xml
+
+# HDFS path to webrequest dataset definition
+datasets_file                     = ${oozie_directory}/webrequest/datasets.xml
+
+# Initial import time of the webrequest dataset.
+start_time                        = 2015-01-01T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                         = 3000-01-01T00:00Z
+
+# HDFS path to hive-site.xml file.  This is needed to run hive actions.
+hive_site_xml                     = ${oozie_directory}/util/hive/hive-site.xml
+
+# Fully qualified Hive table name.
+source_table                       = wmf_raw.webrequest
+destination_table                  = wmf.webrequest
+
+
+# HDFS path to directory where webrequest data is time bucketed.
+webrequest_data_directory         = ${name_node}/wmf/data/raw/webrequest
+
+# Coordintator to start.
+oozie.bundle.application.path     = 
${oozie_directory}/webrequest/refine/bundle.xml
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
diff --git a/oozie/webrequest/refine/bundle.xml 
b/oozie/webrequest/refine/bundle.xml
new file mode 100644
index 0000000..47b74aa
--- /dev/null
+++ b/oozie/webrequest/refine/bundle.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+    name="refine-${source_table}->${destination_table}-bundle">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>default</value>
+        </property>
+
+        <!-- Required properties. -->
+        <property><name>coordinator_file</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+        <property><name>webrequest_data_directory</name></property>
+
+        <property><name>hive_site_xml</name></property>
+        <property><name>artifacts_directory</name></property>
+        <property><name>source_table</name></property>
+        <property><name>destination_table</name></property>
+    </parameters>
+
+    <!--
+    For now, don't refine bits or upload.
+
+    <coordinator name='refine-webrequest-bits'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>bits</value>
+            </property>
+        </configuration>
+    </coordinator>
+    -->
+
+    <coordinator name='refine-webrequest-mobile'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>mobile</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name='refine-webrequest-text'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>text</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <!--
+    <coordinator name='refine-webrequest-upload'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>upload</value>
+            </property>
+        </configuration>
+    </coordinator>
+    -->
+</bundle-app>
diff --git a/oozie/webrequest/refine/coordinator.xml 
b/oozie/webrequest/refine/coordinator.xml
new file mode 100644
index 0000000..ef44a36
--- /dev/null
+++ b/oozie/webrequest/refine/coordinator.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+    
name="refine-${source_table}->${destination_table}-${webrequest_source}-coord"
+    frequency="${coord:hours(1)}"
+    start="${start_time}"
+    end="${stop_time}"
+    timezone="Universal">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>default</value>
+        </property>
+
+        <!-- Required properties. -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+        <property><name>webrequest_data_directory</name></property>
+
+        <property><name>datasets_file</name></property>
+        <property><name>hive_site_xml</name></property>
+        <property><name>artifacts_directory</name></property>
+        <property><name>source_table</name></property>
+        <property><name>destination_table</name></property>
+        <property><name>webrequest_source</name></property>
+    </parameters>
+
+    <controls>
+        <!--
+        Refining is not too cheap, so we limit
+        concurrency.
+
+        Note, that this is per coordinator. So if we run this
+        coordinator for say 4 webrequest_sources (see bundle.xml :-)),
+        we effectively compute sequence statistics for up to 8
+        datasets in parallel.
+
+        Also note, that back-filling is not limited by the
+        coordinator's frequency, so back-filling works nicely
+        even-though the concurrency is low.
+        -->
+        <concurrency>2</concurrency>
+
+
+        <!--
+        Since we expect only one incarnation per hourly dataset, the
+        default throttle of 12 is way to high, and there is not need
+        to keep that many materialized jobs around.
+
+        By resorting to 2, we keep the hdfs checks on the datasets
+        low, while still being able to easily feed the concurrency.
+        -->
+        <throttle>2</throttle>
+    </controls>
+
+    <datasets>
+        <!--
+        Include the given datasets_file file.  This should
+        define the "webrequest" dataset for this coordinator.
+        -->
+        <include>${datasets_file}</include>
+    </datasets>
+
+    <input-events>
+        <data-in name="input" dataset="webrequest_${webrequest_source}">
+            <instance>${coord:current(0)}</instance>
+        </data-in>
+    </input-events>
+
+    <action>
+        <workflow>
+            <app-path>${workflow_file}</app-path>
+            <configuration>
+
+                <!-- Pass these properties through to the workflow -->
+                
<property><name>name_node</name><value>${name_node}</value></property>
+                
<property><name>job_tracker</name><value>${job_tracker}</value></property>
+                
<property><name>queue_name</name><value>${queue_name}</value></property>
+
+                <property>
+                    <name>hive_site_xml</name>
+                    <value>${hive_site_xml}</value>
+                </property>
+                <property>
+                    <name>artifacts_directory</name>
+                    <value>${artifacts_directory}</value>
+                </property>
+                <property>
+                    <name>source_table</name>
+                    <value>${source_table}</value>
+                </property>
+                <property>
+                    <name>destination_table</name>
+                    <value>${destination_table}</value>
+                </property>
+                <property>
+                    <name>webrequest_source</name>
+                    <value>${webrequest_source}</value>
+                </property>
+                <property>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>day</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"d")}</value>
+                </property>
+                <property>
+                    <name>hour</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"H")}</value>
+                </property>
+
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>
diff --git a/oozie/webrequest/refine/refine_webrequest.hql 
b/oozie/webrequest/refine/refine_webrequest.hql
new file mode 100644
index 0000000..98dc01e
--- /dev/null
+++ b/oozie/webrequest/refine/refine_webrequest.hql
@@ -0,0 +1,70 @@
+-- Parameters:
+--     source_table      -- Fully qualified table name to compute the
+--                          statistics for.
+--     destination_table -- Fully qualified table name to stopre the
+--                          computed statistics in. This table should
+--                          have schema described in [1].
+--     webrequest_source -- webrequest_source of partition to compute
+--                          statistics for.
+--     year              -- year of partition to compute statistics
+--                          for.
+--     month             -- month of partition to compute statistics
+--                          for.
+--     day               -- day of partition to compute statistics
+--                          for.
+--     hour              -- hour of partition to compute statistics
+--                          for.
+--
+-- Usage:
+--     hive -f refine_webrequest.hql                              \
+--         -d source_table=wmf_raw.webrequest                     \
+--         -d destination_table=wmf_raw.webrequest_sequence_stats \
+--         -d webrequest_source=bits                              \
+--         -d year=2014                                           \
+--         -d month=12                                            \
+--         -d day=30                                              \
+--         -d hour=1
+--
+
+SET parquet.compression              = SNAPPY;
+SET hive.enforce.bucketing           = true;
+-- mapreduce.job.reduces should not be necessary to
+-- specify since we set hive.enforce.bucketing=true.
+-- However, without this set, only one reduce task is
+-- launched, so we set it manually.  This needs
+-- to be the same as the number of buckets the
+-- table is clustered by.
+SET mapreduce.job.reduces            = 64;
+
+ADD JAR ${artifacts_directory}/refinery-hive.jar;
+CREATE TEMPORARY FUNCTION is_pageview as 
'org.wikimedia.analytics.refinery.hive.IsPageviewUDF';
+
+INSERT OVERWRITE TABLE ${destination_table}
+    
PARTITION(webrequest_source='${webrequest_source}',year=${year},month=${month},day=${day},hour=${hour})
+    SELECT
+        hostname,
+        sequence,
+        dt,
+        time_firstbyte,
+        ip,
+        cache_status,
+        http_status,
+        response_size,
+        http_method,
+        uri_host,
+        uri_path,
+        uri_query,
+        content_type,
+        referer,
+        x_forwarded_for,
+        user_agent,
+        accept_language,
+        x_analytics,
+        range,
+        is_pageview(uri_host, uri_path, uri_query, http_status, content_type, 
user_agent) as is_pageview
+    FROM
+        ${source_table}
+    WHERE
+        webrequest_source='${webrequest_source}' AND
+        year=${year} AND month=${month} AND day=${day} AND hour=${hour}
+;
diff --git a/oozie/webrequest/refine/workflow.xml 
b/oozie/webrequest/refine/workflow.xml
new file mode 100644
index 0000000..599ca73
--- /dev/null
+++ b/oozie/webrequest/refine/workflow.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    
name="refine-${source_table}->${destination_table}-${webrequest_source},${year},${month},${day},${hour}-wf">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>default</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>hive_script</name>
+            <!-- This is relative to the containing directory of this file. -->
+            <value>refine_webrequest.hql</value>
+            <description>Hive script to run.</description>
+        </property>
+
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+        <property>
+            <name>artifacts_directory</name>
+            <description>Path in HDFS to artifacts.  refinery-hive.jar should 
be here.</description>
+        </property>
+        <property>
+            <name>source_table</name>
+            <description>Hive table to refine</description>
+        </property>
+        <property>
+            <name>destination_table</name>
+            <description>The destinaton table to store refined data 
in.</description>
+        </property>
+        <property>
+            <name>webrequest_source</name>
+            <description>The partition's webrequest_source</description>
+        </property>
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>The partition's day</description>
+        </property>
+        <property>
+            <name>hour</name>
+            <description>The partition's hour</description>
+        </property>
+    </parameters>
+
+    <start to="refine"/>
+
+    <action name="refine">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+
+            <script>${hive_script}</script>
+            <param>artifacts_directory=${artifacts_directory}</param>
+            <param>source_table=${source_table}</param>
+            <param>destination_table=${destination_table}</param>
+            <param>webrequest_source=${webrequest_source}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            <param>day=${day}</param>
+            <param>hour=${hour}</param>
+        </hive>
+
+        <ok to="end"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Webrequest refine action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/182478
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ibd5b120ed96080ca99e5a61c48918c95fee7ae06
Gerrit-PatchSet: 10
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to