Ottomata has submitted this change and it was merged.
Change subject: Add pipeline for basic verification of webrequest logs
......................................................................
Add pipeline for basic verification of webrequest logs
Bug: 67128
Change-Id: Ie34f09a671a2ce341daabd8822d27e6b993d2e3e
---
A oozie/webrequest/partition/add/README.md
R oozie/webrequest/partition/add/bundle.properties
A oozie/webrequest/partition/add/bundle.xml
A oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
R oozie/webrequest/partition/add/compute_sequence_statistics.hql
M oozie/webrequest/partition/add/coordinator.xml
A oozie/webrequest/partition/add/extract_faulty_hosts.hql
A oozie/webrequest/partition/add/workflow.xml
8 files changed, 448 insertions(+), 9 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
diff --git a/oozie/webrequest/partition/add/README.md
b/oozie/webrequest/partition/add/README.md
new file mode 100644
index 0000000..37e0a78
--- /dev/null
+++ b/oozie/webrequest/partition/add/README.md
@@ -0,0 +1,38 @@
+# Basic verification for webrequest logs
+
+The basic verification analyzes each log line's sequence number and
+computes per host statistics. It detects holes, duplicates, and nulls
+in sequence numbers. But there is no check on new hosts arriving,
+hosts getting decommissioned, or on the amount of per host traffic.
+
+If a dataset (i.e.: webrequest_source per hour) does not have
+duplicates, holes, or nulls, the directory gets a ```_SUCCESS```
+marker.
+
+# Outline
+
+* ```bundle.properties``` can be used to inject the whole verification
+ pipeline into oozie.
+* ```bundle.xml``` injects separate coordinators for each of the
+ webrequest_sources.
+* ```coordinator.xml``` injects a workflow for each dataset.
+* ```workflow.xml```
+ * adds a partition to a common (not intended for researcher's use)
+ table (through ```oozie/util/hive/partition/add/workflow.xml```),
+ * extracts the dataset's sequence statistics into a separate table
+ (so the statistics are easily queryable and need not be recomputed
+ when drilling in)
+ * and puts per dataset information into separate files,
+ * analyzes those files to determine whether or not the dataset is
+ ok, and
+ * finally writes the ```_SUCCESS``` marker to the dataset, if it is
+ ok.
+
+Note that we add the partition to the table before verification, and
+do not drop the partition if there is an error. Hence, the table might
+contain partitions that contains duplicates/holes. This is for the
+ease of the developers when trying to have a look at the data. The
+table is not meant for researchers.
+
+Icinga monitoring for the ```_SUCCESS``` marker is not part of this
+setup and can be found at {{Citation needed}}.
\ No newline at end of file
diff --git a/oozie/webrequest/partition/add/coordinator.properties
b/oozie/webrequest/partition/add/bundle.properties
similarity index 68%
rename from oozie/webrequest/partition/add/coordinator.properties
rename to oozie/webrequest/partition/add/bundle.properties
index 6c10ab1..7b5c72e 100644
--- a/oozie/webrequest/partition/add/coordinator.properties
+++ b/oozie/webrequest/partition/add/bundle.properties
@@ -1,10 +1,7 @@
-# Configures a coordinator to manage automatically adding Hive partitions to
+# Configures a bundle to manage automatically adding Hive partitions to
# a webrequest table. Any of the following properties are overidable with -D.
-# The 'webrequest_source' property is required and must be set on the CLI when
-# submitting this coordinator.
-#
# Usage:
-# oozie job -Dwebrequest_source=mobile -submit -config
oozie/webrequest/add/coordinator.properties.
+# oozie job -submit -config oozie/webrequest/add/bundle.properties.
#
# NOTE: The $oozie_directory must be synced to HDFS so that all relevant
# .xml files exist there when this job is submitted.
@@ -20,8 +17,11 @@
# Other files will be used relative to this path.
oozie_directory = ${name_node}/wmf/refinery/current/oozie
+# HDFS path to coordinator to run for each webrequest_source.
+coordinator_file =
${oozie_directory}/webrequest/partition/add/coordinator.xml
+
# HDFS path to workflow to run.
-workflow_file =
${oozie_directory}/util/hive/partition/add/workflow.xml
+workflow_file =
${oozie_directory}/webrequest/partition/add/workflow.xml
# HDFS path to webrequest dataset definition
datasets_file = ${oozie_directory}/webrequest/datasets.xml
@@ -32,16 +32,25 @@
# Time to stop running this coordinator. Year 3000 == never!
stop_time = 3000-01-01T00:00Z
+# Workflow to add a partition
+add_partition_workflow_file =
${oozie_directory}/util/hive/partition/add/workflow.xml
+
# HDFS path to hive-site.xml file. This is needed to run hive actions.
hive_site_xml = ${oozie_directory}/util/hive/hive-site.xml
# Fully qualified Hive table name.
table = wmf_raw.webrequest
+# Hive table name.
+statistics_table = wmf_raw.webrequest_sequence_stats
+
+# Base directory for obviously faulty hosts
+faulty_hosts_directory =
${name_node}/wmf/data/raw/webrequests_faulty_hosts
+
# HDFS path to directory where webrequest data is time bucketed.
data_directory =
${name_node}/wmf/data/raw/webrequest/webrequest_${webrequest_source}/hourly
# Coordintator to start.
-oozie.coord.application.path =
${oozie_directory}/webrequest/partition/add/coordinator.xml
+oozie.bundle.application.path =
${oozie_directory}/webrequest/partition/add/bundle.xml
oozie.use.system.libpath = true
oozie.action.external.stats.write = true
diff --git a/oozie/webrequest/partition/add/bundle.xml
b/oozie/webrequest/partition/add/bundle.xml
new file mode 100644
index 0000000..0dbf887
--- /dev/null
+++ b/oozie/webrequest/partition/add/bundle.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+ name="hive_add_partition-${table}-bundle">
+
+ <parameters>
+ <property>
+ <name>queue_name</name>
+ <value>adhoc</value>
+ </property>
+
+ <!-- Required properties. -->
+ <property><name>coordinator_file</name></property>
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+ <property><name>workflow_file</name></property>
+ <property><name>start_time</name></property>
+ <property><name>stop_time</name></property>
+ <property><name>data_directory</name></property>
+
+ <property><name>hive_site_xml</name></property>
+ <property><name>add_partition_workflow_file</name></property>
+ <property><name>table</name></property>
+ <property><name>statistics_table</name></property>
+ <property><name>faulty_hosts_directory</name></property>
+ </parameters>
+
+ <coordinator name='add_partition_bits'>
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>webrequest_source</name>
+ <value>bits</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+ <coordinator name='add_partition_mobile'>
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>webrequest_source</name>
+ <value>mobile</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+ <coordinator name='add_partition_text'>
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>webrequest_source</name>
+ <value>text</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+ <coordinator name='add_partition_upload'>
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>webrequest_source</name>
+ <value>upload</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+</bundle-app>
diff --git
a/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
b/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
new file mode 100644
index 0000000..9b1e3b7
--- /dev/null
+++ b/oozie/webrequest/partition/add/check_sequence_statistics_workflow.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+ name="check_sequence_statistics-wf">
+
+ <parameters>
+ <property>
+ <name>queue_name</name>
+ <value>adhoc</value>
+ </property>
+
+ <!-- Required properties -->
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property>
+ <name>hive_site_xml</name>
+ <description>hive-site.xml file path in HDFS</description>
+ </property>
+ <property>
+ <name>webrequest_source</name>
+ <description>The partition's webrequest_source</description>
+ </property>
+ <property>
+ <name>year</name>
+ <description>The partition's year</description>
+ </property>
+ <property>
+ <name>month</name>
+ <description>The partition's month</description>
+ </property>
+ <property>
+ <name>day</name>
+ <description>The partition's day</description>
+ </property>
+ <property>
+ <name>hour</name>
+ <description>The partition's hour</description>
+ </property>
+ <property>
+ <name>statistics_table</name>
+ <description>
+ Hive table to write partition statistics to.
+ </description>
+ </property>
+ <property>
+ <name>faulty_hosts_directory</name>
+ <description>
+ Base directory in HDFS where information about
+ obviously faulty hosts will get collected.
+ </description>
+ </property>
+ </parameters>
+
+ <start to="extract_faulty_hosts"/>
+
+ <action name="extract_faulty_hosts">
+ <hive xmlns="uri:oozie:hive-action:0.3">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <job-xml>${hive_site_xml}</job-xml>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ </configuration>
+
+ <script>extract_faulty_hosts.hql</script>
+
+ <param>table=${statistics_table}</param>
+ <param>target=${faulty_hosts_directory}</param>
+ <param>webrequest_source=${webrequest_source}</param>
+ <param>year=${year}</param>
+ <param>month=${month}</param>
+ <param>day=${day}</param>
+ <param>hour=${hour}</param>
+ </hive>
+ <ok to="check_faulty_hosts"/>
+ <error to="kill"/>
+ </action>
+
+ <decision name="check_faulty_hosts">
+ <switch>
+ <case to="end">
+ ${fs:fileSize(concat(faulty_hosts_directory, "/000000_0")) eq
0}
+ </case>
+ <default to="kill_faulty_hosts"/>
+ </switch>
+ </decision>
+
+ <kill name="kill_faulty_hosts">
+ <message>Faulty hosts file (${faulty_hosts_directory}/000000_0) is not
empty. So either there are faulty hosts, or computation failed.</message>
+ </kill>
+
+ <kill name="kill">
+ <message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
diff --git a/hive/webrequest/sequence_stats.hql
b/oozie/webrequest/partition/add/compute_sequence_statistics.hql
similarity index 100%
rename from hive/webrequest/sequence_stats.hql
rename to oozie/webrequest/partition/add/compute_sequence_statistics.hql
diff --git a/oozie/webrequest/partition/add/coordinator.xml
b/oozie/webrequest/partition/add/coordinator.xml
index 2ae0ae9..82475c3 100644
--- a/oozie/webrequest/partition/add/coordinator.xml
+++ b/oozie/webrequest/partition/add/coordinator.xml
@@ -21,8 +21,11 @@
<property><name>data_directory</name></property>
<property><name>hive_site_xml</name></property>
+ <property><name>add_partition_workflow_file</name></property>
<property><name>table</name></property>
+ <property><name>statistics_table</name></property>
<property><name>webrequest_source</name></property>
+ <property><name>faulty_hosts_directory</name></property>
</parameters>
<controls>
@@ -67,6 +70,10 @@
<property><name>queue_name</name><value>${queue_name}</value></property>
<property>
+ <name>add_partition_workflow_file</name>
+ <value>${add_partition_workflow_file}</value>
+ </property>
+ <property>
<name>hive_site_xml</name>
<value>${hive_site_xml}</value>
</property>
@@ -75,13 +82,33 @@
<value>${table}</value>
</property>
<property>
- <name>partition_spec</name>
-
<value>webrequest_source='${webrequest_source}',${coord:formatTime(coord:nominalTime(),
"'year='y,'month='M,'day='d,'hour='H")}</value>
+ <name>year</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"y")}</value>
+ </property>
+ <property>
+ <name>month</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"M")}</value>
+ </property>
+ <property>
+ <name>day</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"d")}</value>
+ </property>
+ <property>
+ <name>hour</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"H")}</value>
</property>
<property>
<name>location</name>
<value>${coord:dataIn('input')}</value>
</property>
+ <property>
+ <name>statistics_table</name>
+ <value>${statistics_table}</value>
+ </property>
+ <property>
+ <name>faulty_hosts_directory</name>
+ <value>${faulty_hosts_directory}</value>
+ </property>
</configuration>
</workflow>
diff --git a/oozie/webrequest/partition/add/extract_faulty_hosts.hql
b/oozie/webrequest/partition/add/extract_faulty_hosts.hql
new file mode 100644
index 0000000..736dc69
--- /dev/null
+++ b/oozie/webrequest/partition/add/extract_faulty_hosts.hql
@@ -0,0 +1,57 @@
+-- Extracts obviously faulty hosts from webrequest statistics and puts
+-- them into a single file.
+--
+-- The file with faulty hosts information is ${target}/000000_0
+-- If there are no faulty hosts, the file will exist, but will be
+-- empty.
+--
+-- Parameters:
+-- table -- Fully qualified table name containing the
+-- statistics to analyize.
+-- target -- Path in HDFS where to write the file with
+-- obviously faulty hosts in. If this path
+-- exists, it will get overwritten.
+-- webrequest_source -- webrequest_source for the partition to
+-- extractfaulty hosts for.
+-- year -- year for the partition to extract faulty
+-- hosts for.
+-- month -- month for the partition to extract faulty
+-- hosts for.
+-- day -- day for the partition to extract faulty
+-- hosts for.
+-- hour -- hour for the partition to extract faulty
+-- hosts for.
+--
+-- Usage:
+-- hive -f extract_faulty_hosts.hql \
+-- -d table=wmf_raw.webrequest_sequence_stats \
+-- -d target=hdfs:///tmp/faulty_hosts \
+-- -d webrequest_source=bits \
+-- -d year=2014 \
+-- -d month=5 \
+-- -d day=12 \
+-- -d hour=1
+--
+
+
+-- Hard-limiting number of reducer to force a single file in the
+-- target directory.
+SET mapred.reduce.tasks=1;
+
+-- Allow INSERT OVERWRITE into nested directory, so we need not take
+-- care of creating directories
+SET hive.insert.into.multilevel.dirs=true;
+
+
+INSERT OVERWRITE DIRECTORY '${target}'
+ SELECT * FROM ${table} WHERE
+ (
+ count_duplicate <> 0 -- Host has duplicates
+ OR
+ count_different <> 0 -- Host has duplicates or holes
+ OR
+ count_null_sequence <> 0 -- Host has NULL sequence numbers
+ ) AND
+ webrequest_source='${webrequest_source}' AND
+ year=${year} AND month=${month} AND day=${day} AND hour=${hour}
+;
diff --git a/oozie/webrequest/partition/add/workflow.xml
b/oozie/webrequest/partition/add/workflow.xml
new file mode 100644
index 0000000..07b1580
--- /dev/null
+++ b/oozie/webrequest/partition/add/workflow.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+ name="hive_add_partition-${table}_${webrequest_source}-wf">
+
+ <parameters>
+ <property>
+ <name>queue_name</name>
+ <value>adhoc</value>
+ </property>
+
+ <!-- Required properties -->
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property>
+ <name>add_partition_workflow_file</name>
+ <description>Workflow definition for adding a
partition</description>
+ </property>
+ <property>
+ <name>hive_site_xml</name>
+ <description>hive-site.xml file path in HDFS</description>
+ </property>
+ <property>
+ <name>table</name>
+ <description>Hive table to partition.</description>
+ </property>
+ <property>
+ <name>webrequest_source</name>
+ <description>The partition's webrequest_source</description>
+ </property>
+ <property>
+ <name>year</name>
+ <description>The partition's year</description>
+ </property>
+ <property>
+ <name>month</name>
+ <description>The partition's month</description>
+ </property>
+ <property>
+ <name>day</name>
+ <description>The partition's day</description>
+ </property>
+ <property>
+ <name>hour</name>
+ <description>The partition's hour</description>
+ </property>
+ <property>
+ <name>location</name>
+ <description>HDFS path(s) naming the input dataset.</description>
+ </property>
+ <property>
+ <name>statistics_table</name>
+ <description>
+ Hive table to write partition statistics to.
+ </description>
+ </property>
+ <property>
+ <name>faulty_hosts_directory</name>
+ <description>
+ Base directory in HDFS where information about
+ obviously faulty hosts will get collected.
+ </description>
+ </property>
+ </parameters>
+
+ <start to="add_partition"/>
+
+ <action name="add_partition">
+ <sub-workflow>
+ <app-path>${add_partition_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>partition_spec</name>
+
<value>webrequest_source='${webrequest_source}',year=${year},month=${month},day=${day},hour=${hour}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="compute_sequence_statistics"/>
+ <error to="kill"/>
+ </action>
+
+ <action name="compute_sequence_statistics">
+ <hive xmlns="uri:oozie:hive-action:0.3">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <job-xml>${hive_site_xml}</job-xml>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ </configuration>
+
+ <script>compute_sequence_statistics.hql</script>
+
+ <param>source_table=${table}</param>
+ <param>destination_table=${statistics_table}</param>
+ <param>year=${year}</param>
+ <param>month=${month}</param>
+ <param>day=${day}</param>
+ <param>hour=${hour}</param>
+ <param>webrequest_source=${webrequest_source}</param>
+ </hive>
+ <ok to="check_sequence_statistics"/>
+ <error to="kill"/>
+ </action>
+
+ <!-- We put checking sequence statistics into a separate workflow,
+ as that allows to build the full path for the "faulty hosts"
+ directory (including source, year, ...) as element value, and
+ thereby allows to evade building it in EL via a nested concat
+ construct to use it in <case />, which would not be readable
+ at all. -->
+ <action name="check_sequence_statistics">
+ <sub-workflow>
+ <app-path>${replaceAll(wf:appPath(), "/[^/]*$",
"")}/check_sequence_statistics_workflow.xml</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>faulty_hosts_directory</name>
+
<value>${faulty_hosts_directory}/${webrequest_source}/${year}/${month}/${day}/${hour}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="mark_dataset_done"/>
+ <error to="kill"/>
+ </action>
+
+ <action name="mark_dataset_done">
+ <fs>
+ <touchz path="${location}/_SUCCESS" />
+ </fs>
+ <ok to="end"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
--
To view, visit https://gerrit.wikimedia.org/r/148650
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ie34f09a671a2ce341daabd8822d27e6b993d2e3e
Gerrit-PatchSet: 2
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: QChris <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits