Ottomata has submitted this change and it was merged.

Change subject: Add pipeline for basic verification of webrequest logs
......................................................................


Add pipeline for basic verification of webrequest logs

Bug: 67128
Change-Id: Ie34f09a671a2ce341daabd8822d27e6b993d2e3e
---
A oozie/webrequest/partition/add/README.md
R oozie/webrequest/partition/add/bundle.properties
A oozie/webrequest/partition/add/bundle.xml
A oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
R oozie/webrequest/partition/add/compute_sequence_statistics.hql
M oozie/webrequest/partition/add/coordinator.xml
A oozie/webrequest/partition/add/extract_faulty_hosts.hql
A oozie/webrequest/partition/add/workflow.xml
8 files changed, 448 insertions(+), 9 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/oozie/webrequest/partition/add/README.md 
b/oozie/webrequest/partition/add/README.md
new file mode 100644
index 0000000..37e0a78
--- /dev/null
+++ b/oozie/webrequest/partition/add/README.md
@@ -0,0 +1,38 @@
+# Basic verification for webrequest logs
+
+The basic verification analyzes each log line's sequence number and
+computes per host statistics. It detects holes, duplicates, and nulls
+in sequence numbers. But there is no check on new hosts arriving,
+hosts getting decommissioned, or on the amount of per host traffic.
+
+If a dataset (i.e.: webrequest_source per hour) does not have
+duplicates, holes, or nulls, the directory gets a ```_SUCCESS```
+marker.
+
+# Outline
+
+* ```bundle.properties``` can be used to inject the whole verification
+  pipeline into oozie.
+* ```bundle.xml``` injects separate coordinators for each of the
+  webrequest_sources.
+* ```coordinator.xml``` injects a workflow for each dataset.
+* ```workflow.xml```
+  * adds a partition to a common (not intended for researcher's use)
+    table (through ```oozie/util/hive/partition/add/workflow.xml```),
+  * extracts the dataset's sequence statistics into a separate table
+    (so the statistics are easily queryable and need not be recomputed
+    when drilling in)
+  * and puts per dataset information into separate files,
+  * analyzes those files to determine whether or not the dataset is
+    ok, and
+  * finally writes the ```_SUCCESS``` marker to the dataset, if it is
+    ok.
+
+Note that we add the partition to the table before verification, and
+do not drop the partition if there is an error. Hence, the table might
+contain partitions that contains duplicates/holes. This is for the
+ease of the developers when trying to have a look at the data. The
+table is not meant for researchers.
+
+Icinga monitoring for the ```_SUCCESS``` marker is not part of this
+setup and can be found at {{Citation needed}}.
\ No newline at end of file
diff --git a/oozie/webrequest/partition/add/coordinator.properties 
b/oozie/webrequest/partition/add/bundle.properties
similarity index 68%
rename from oozie/webrequest/partition/add/coordinator.properties
rename to oozie/webrequest/partition/add/bundle.properties
index 6c10ab1..7b5c72e 100644
--- a/oozie/webrequest/partition/add/coordinator.properties
+++ b/oozie/webrequest/partition/add/bundle.properties
@@ -1,10 +1,7 @@
-# Configures a coordinator to manage automatically adding Hive partitions to
+# Configures a bundle to manage automatically adding Hive partitions to
 # a webrequest table.  Any of the following properties are overidable with -D.
-# The 'webrequest_source' property is required and must be set on the CLI when
-# submitting this coordinator.
-#
 # Usage:
-# oozie job -Dwebrequest_source=mobile -submit -config 
oozie/webrequest/add/coordinator.properties.
+# oozie job -submit -config oozie/webrequest/add/bundle.properties.
 #
 # NOTE:  The $oozie_directory must be synced to HDFS so that all relevant
 #        .xml files exist there when this job is submitted.
@@ -20,8 +17,11 @@
 # Other files will be used relative to this path.
 oozie_directory                   = ${name_node}/wmf/refinery/current/oozie
 
+# HDFS path to coordinator to run for each webrequest_source.
+coordinator_file                  = 
${oozie_directory}/webrequest/partition/add/coordinator.xml
+
 # HDFS path to workflow to run.
-workflow_file                     = 
${oozie_directory}/util/hive/partition/add/workflow.xml
+workflow_file                     = 
${oozie_directory}/webrequest/partition/add/workflow.xml
 
 # HDFS path to webrequest dataset definition
 datasets_file                     = ${oozie_directory}/webrequest/datasets.xml
@@ -32,16 +32,25 @@
 # Time to stop running this coordinator.  Year 3000 == never!
 stop_time                         = 3000-01-01T00:00Z
 
+# Workflow to add a partition
+add_partition_workflow_file       = 
${oozie_directory}/util/hive/partition/add/workflow.xml
+
 # HDFS path to hive-site.xml file.  This is needed to run hive actions.
 hive_site_xml                     = ${oozie_directory}/util/hive/hive-site.xml
 
 # Fully qualified Hive table name.
 table                             = wmf_raw.webrequest
 
+# Hive table name.
+statistics_table                  = wmf_raw.webrequest_sequence_stats
+
+# Base directory for obviously faulty hosts
+faulty_hosts_directory            = 
${name_node}/wmf/data/raw/webrequests_faulty_hosts
+
 # HDFS path to directory where webrequest data is time bucketed.
 data_directory                    = 
${name_node}/wmf/data/raw/webrequest/webrequest_${webrequest_source}/hourly
 
 # Coordintator to start.
-oozie.coord.application.path      = 
${oozie_directory}/webrequest/partition/add/coordinator.xml
+oozie.bundle.application.path     = 
${oozie_directory}/webrequest/partition/add/bundle.xml
 oozie.use.system.libpath          = true
 oozie.action.external.stats.write = true
diff --git a/oozie/webrequest/partition/add/bundle.xml 
b/oozie/webrequest/partition/add/bundle.xml
new file mode 100644
index 0000000..0dbf887
--- /dev/null
+++ b/oozie/webrequest/partition/add/bundle.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+    name="hive_add_partition-${table}-bundle">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>adhoc</value>
+        </property>
+
+        <!-- Required properties. -->
+        <property><name>coordinator_file</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+        <property><name>data_directory</name></property>
+
+        <property><name>hive_site_xml</name></property>
+        <property><name>add_partition_workflow_file</name></property>
+        <property><name>table</name></property>
+        <property><name>statistics_table</name></property>
+        <property><name>faulty_hosts_directory</name></property>
+    </parameters>
+
+    <coordinator name='add_partition_bits'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>bits</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name='add_partition_mobile'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>mobile</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name='add_partition_text'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>text</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name='add_partition_upload'>
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>webrequest_source</name>
+                <value>upload</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+</bundle-app>
diff --git 
a/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml 
b/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
new file mode 100644
index 0000000..9b1e3b7
--- /dev/null
+++ b/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="check_sequence_statistics-wf">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>adhoc</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+        <property>
+            <name>webrequest_source</name>
+            <description>The partition's webrequest_source</description>
+        </property>
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>The partition's day</description>
+        </property>
+        <property>
+            <name>hour</name>
+            <description>The partition's hour</description>
+        </property>
+        <property>
+            <name>statistics_table</name>
+            <description>
+                Hive table to write partition statistics to.
+            </description>
+        </property>
+        <property>
+            <name>faulty_hosts_directory</name>
+            <description>
+                Base directory in HDFS where information about
+                obviously faulty hosts will get collected.
+            </description>
+        </property>
+    </parameters>
+
+    <start to="extract_faulty_hosts"/>
+
+    <action name="extract_faulty_hosts">
+        <hive xmlns="uri:oozie:hive-action:0.3">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+            </configuration>
+
+            <script>extract_faulty_hosts.hql</script>
+
+            <param>table=${statistics_table}</param>
+            <param>target=${faulty_hosts_directory}</param>
+            <param>webrequest_source=${webrequest_source}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            <param>day=${day}</param>
+            <param>hour=${hour}</param>
+        </hive>
+        <ok to="check_faulty_hosts"/>
+        <error to="kill"/>
+    </action>
+
+    <decision name="check_faulty_hosts">
+        <switch>
+            <case to="end">
+                ${fs:fileSize(concat(faulty_hosts_directory, "/000000_0")) eq 
0}
+            </case>
+            <default to="kill_faulty_hosts"/>
+        </switch>
+    </decision>
+
+    <kill name="kill_faulty_hosts">
+        <message>Faulty hosts file (${faulty_hosts_directory}/000000_0) is not 
empty. So either there are faulty hosts, or computation failed.</message>
+    </kill>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
diff --git a/hive/webrequest/sequence_stats.hql 
b/oozie/webrequest/partition/add/compute_sequence_statistics.hql
similarity index 100%
rename from hive/webrequest/sequence_stats.hql
rename to oozie/webrequest/partition/add/compute_sequence_statistics.hql
diff --git a/oozie/webrequest/partition/add/coordinator.xml 
b/oozie/webrequest/partition/add/coordinator.xml
index 2ae0ae9..82475c3 100644
--- a/oozie/webrequest/partition/add/coordinator.xml
+++ b/oozie/webrequest/partition/add/coordinator.xml
@@ -21,8 +21,11 @@
         <property><name>data_directory</name></property>
 
         <property><name>hive_site_xml</name></property>
+        <property><name>add_partition_workflow_file</name></property>
         <property><name>table</name></property>
+        <property><name>statistics_table</name></property>
         <property><name>webrequest_source</name></property>
+        <property><name>faulty_hosts_directory</name></property>
     </parameters>
 
     <controls>
@@ -67,6 +70,10 @@
                 
<property><name>queue_name</name><value>${queue_name}</value></property>
 
                 <property>
+                    <name>add_partition_workflow_file</name>
+                    <value>${add_partition_workflow_file}</value>
+                </property>
+                <property>
                     <name>hive_site_xml</name>
                     <value>${hive_site_xml}</value>
                 </property>
@@ -75,13 +82,33 @@
                     <value>${table}</value>
                 </property>
                 <property>
-                    <name>partition_spec</name>
-                    
<value>webrequest_source='${webrequest_source}',${coord:formatTime(coord:nominalTime(),
 "'year='y,'month='M,'day='d,'hour='H")}</value>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>day</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"d")}</value>
+                </property>
+                <property>
+                    <name>hour</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"H")}</value>
                 </property>
                 <property>
                     <name>location</name>
                     <value>${coord:dataIn('input')}</value>
                 </property>
+                <property>
+                    <name>statistics_table</name>
+                    <value>${statistics_table}</value>
+                </property>
+                <property>
+                    <name>faulty_hosts_directory</name>
+                    <value>${faulty_hosts_directory}</value>
+                </property>
 
             </configuration>
         </workflow>
diff --git a/oozie/webrequest/partition/add/extract_faulty_hosts.hql 
b/oozie/webrequest/partition/add/extract_faulty_hosts.hql
new file mode 100644
index 0000000..736dc69
--- /dev/null
+++ b/oozie/webrequest/partition/add/extract_faulty_hosts.hql
@@ -0,0 +1,57 @@
+-- Extracts obviously faulty hosts from webrequest statistics and puts
+-- them into a single file.
+--
+-- The file with faulty hosts information is ${target}/000000_0
+-- If there are no faulty hosts, the file will exist, but will be
+-- empty.
+--
+-- Parameters:
+--   table             -- Fully qualified table name containing the
+--                        statistics to analyize.
+--   target            -- Path in HDFS where to write the file with
+--                        obviously faulty hosts in. If this path
+--                        exists, it will get overwritten.
+--   webrequest_source -- webrequest_source for the partition to
+--                        extractfaulty hosts for.
+--   year              -- year for the partition to extract faulty
+--                        hosts for.
+--   month             -- month for the partition to extract faulty
+--                        hosts for.
+--   day               -- day for the partition to extract faulty
+--                        hosts for.
+--   hour              -- hour for the partition to extract faulty
+--                        hosts for.
+--
+-- Usage:
+--     hive -f extract_faulty_hosts.hql \
+--         -d table=wmf_raw.webrequest_sequence_stats \
+--         -d target=hdfs:///tmp/faulty_hosts \
+--         -d webrequest_source=bits \
+--         -d year=2014 \
+--         -d month=5 \
+--         -d day=12 \
+--         -d hour=1
+--
+
+
+-- Hard-limiting number of reducer to force a single file in the
+-- target directory.
+SET mapred.reduce.tasks=1;
+
+-- Allow INSERT OVERWRITE into nested directory, so we need not take
+-- care of creating directories
+SET hive.insert.into.multilevel.dirs=true;
+
+
+INSERT OVERWRITE DIRECTORY '${target}'
+    SELECT * FROM ${table} WHERE
+        (
+                count_duplicate <> 0      -- Host has duplicates
+            OR
+                count_different <> 0      -- Host has duplicates or holes
+            OR
+                count_null_sequence <> 0  -- Host has NULL sequence numbers
+        ) AND
+        webrequest_source='${webrequest_source}' AND
+        year=${year} AND month=${month} AND day=${day} AND hour=${hour}
+;
diff --git a/oozie/webrequest/partition/add/workflow.xml 
b/oozie/webrequest/partition/add/workflow.xml
new file mode 100644
index 0000000..07b1580
--- /dev/null
+++ b/oozie/webrequest/partition/add/workflow.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="hive_add_partition-${table}_${webrequest_source}-wf">
+
+    <parameters>
+        <property>
+            <name>queue_name</name>
+            <value>adhoc</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>add_partition_workflow_file</name>
+            <description>Workflow definition for adding a 
partition</description>
+        </property>
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+        <property>
+            <name>table</name>
+            <description>Hive table to partition.</description>
+        </property>
+        <property>
+            <name>webrequest_source</name>
+            <description>The partition's webrequest_source</description>
+        </property>
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>The partition's day</description>
+        </property>
+        <property>
+            <name>hour</name>
+            <description>The partition's hour</description>
+        </property>
+        <property>
+            <name>location</name>
+            <description>HDFS path(s) naming the input dataset.</description>
+        </property>
+        <property>
+            <name>statistics_table</name>
+            <description>
+                Hive table to write partition statistics to.
+            </description>
+        </property>
+        <property>
+            <name>faulty_hosts_directory</name>
+            <description>
+                Base directory in HDFS where information about
+                obviously faulty hosts will get collected.
+            </description>
+        </property>
+    </parameters>
+
+    <start to="add_partition"/>
+
+    <action name="add_partition">
+        <sub-workflow>
+            <app-path>${add_partition_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>partition_spec</name>
+                    
<value>webrequest_source='${webrequest_source}',year=${year},month=${month},day=${day},hour=${hour}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="compute_sequence_statistics"/>
+        <error to="kill"/>
+    </action>
+
+    <action name="compute_sequence_statistics">
+        <hive xmlns="uri:oozie:hive-action:0.3">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+            </configuration>
+
+            <script>compute_sequence_statistics.hql</script>
+
+            <param>source_table=${table}</param>
+            <param>destination_table=${statistics_table}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            <param>day=${day}</param>
+            <param>hour=${hour}</param>
+            <param>webrequest_source=${webrequest_source}</param>
+        </hive>
+        <ok to="check_sequence_statistics"/>
+        <error to="kill"/>
+    </action>
+
+    <!-- We put checking sequence statistics into a separate workflow,
+        as that allows to build the full path for the "faulty hosts"
+        directory (including source, year, ...) as element value, and
+        thereby allows to evade building it in EL via a nested concat
+        construct to use it in <case />, which would not be readable
+        at all. -->
+    <action name="check_sequence_statistics">
+        <sub-workflow>
+            <app-path>${replaceAll(wf:appPath(), "/[^/]*$", 
"")}/check_sequence_statistics_workflow.xml</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>faulty_hosts_directory</name>
+                    
<value>${faulty_hosts_directory}/${webrequest_source}/${year}/${month}/${day}/${hour}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="mark_dataset_done"/>
+        <error to="kill"/>
+    </action>
+
+    <action name="mark_dataset_done">
+        <fs>
+            <touchz path="${location}/_SUCCESS" />
+        </fs>
+        <ok to="end"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/148650
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ie34f09a671a2ce341daabd8822d27e6b993d2e3e
Gerrit-PatchSet: 2
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: QChris <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to