Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/298131

Change subject: [WIP] Add Druid loading daily oozie job
......................................................................

[WIP] Add Druid loading daily oozie job

Change-Id: I32c800b6d95836031380773d3cb49d6dba6b6a03
---
A oozie/pageview/druid/README.md
A oozie/pageview/druid/bundle.xml
A oozie/pageview/druid/daily/coordinator.properties
A oozie/pageview/druid/daily/coordinator.xml
A oozie/pageview/druid/daily/load_pageview_daily.json.template
A oozie/pageview/druid/daily/workflow.xml
A oozie/util/load_druid/druid_loader
A oozie/util/load_druid/workflow.xml
8 files changed, 786 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/31/298131/1

diff --git a/oozie/pageview/druid/README.md b/oozie/pageview/druid/README.md
new file mode 100644
index 0000000..611836a
--- /dev/null
+++ b/oozie/pageview/druid/README.md
@@ -0,0 +1,16 @@
+Oozie jobs to schedule importing pageviews in druid.
+Daily and monthly folders contain coordinator to load in druid:
+ - hourly detailed pageviews, every day (daily folder)
+ - daily detailed pageviews, every month (monthly folder)
+
+The workflow launches the PageviewToJSON spark action,
+then launches the druid indexation using the generated data.
+The script waits for those indexation to finish,
+then delete the json files.
+
+Example command for running the coordinator on command line:
+
+    oozie job -run \
+         -config daily/coordinator.properties \
+         -D refinery_directory=hdfs://analytics-hadoop/wmf/refinery/current \
+         -D 
spark_job_jar=hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-job.jar
diff --git a/oozie/pageview/druid/bundle.xml b/oozie/pageview/druid/bundle.xml
new file mode 100644
index 0000000..7d90782
--- /dev/null
+++ b/oozie/pageview/druid/bundle.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+    name="pageview-druid-test-bundle">
+
+    <parameters>
+
+        <!-- Required properties -->
+        <property><name>queue_name</name></property>
+        <property><name>coordinator_file</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+        <property><name>pageview_datasets_file</name></property>
+        <property><name>pageview_data_directory</name></property>
+        <property><name>pageview_table</name></property>
+
+        <property><name>spark_master</name></property>
+        <property><name>spark_job_jar</name></property>
+        <property><name>spark_job_class</name></property>
+        <property><name>spark_executor_memory</name></property>
+        <property><name>spark_driver_memory</name></property>
+
+        <property><name>send_error_email_workflow_file</name></property>
+    </parameters>
+
+    <coordinator name="pageview-druid-coord-hour-daily-test" >
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>frequency</name>
+                <value>${"$"}{coord:days(1)}</value>
+            </property>
+            <property>
+                <name>data_end_instance</name>
+                <value>${"$"}{coord:current(23)}</value>
+            </property>
+            <property>
+                <name>date_offset</name>
+                <value>DAY</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name="pageview-druid-coord-day-monthly-test">
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>frequency</name>
+                <value>${"$"}{coord:months(1)}</value>
+            </property>
+            <property>
+                <name>data_end_instance</name>
+                <value>${"$"}{coord:current(coord:daysInMonth(0) * 24 - 
1)}</value>
+            </property>
+            <property>
+                <name>date_offset</name>
+                <value>MONTH</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+</bundle-app>
diff --git a/oozie/pageview/druid/daily/coordinator.properties 
b/oozie/pageview/druid/daily/coordinator.properties
new file mode 100644
index 0000000..d895071
--- /dev/null
+++ b/oozie/pageview/druid/daily/coordinator.properties
@@ -0,0 +1,80 @@
+# Configures a bundle to generate JSON pageviews and load them in druid.
+# Any of the following properties are override-able with -D.
+# Usage:
+#   oozie job -Dstart_time=2016-06-01T00:00Z -submit -config 
oozie/pageview/druid/bundle.properties
+#
+# NOTE:  The $refinery_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+#Default user
+user                              = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should override this to point 
directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory. E.g. 
/wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                = ${name_node}/wmf/refinery/current
+
+# HDFS path to artifacts that will be used by this job.
+# E.g. refinery-job.jar should exist here.
+artifacts_directory               = ${refinery_directory}/artifacts
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                   = ${refinery_directory}/oozie
+
+# HDFS path to coordinator to run.
+coordinator_file                  = 
${oozie_directory}/pageview/druid/daily/coordinator.xml
+# HDFS path to workflow to run.
+workflow_file                     = 
${oozie_directory}/pageview/druid/daily/workflow.xml
+
+# HDFS path to pageview dataset definitions
+pageview_datasets_file            = ${oozie_directory}/pageview/datasets.xml
+pageview_data_directory           = ${name_node}/wmf/data/wmf/pageview
+
+# Pageview table name (used by spark job)
+pageview_table                    = wmf.pageview_hourly
+
+# Initial import time of the webrequest dataset.
+start_time                        = 2015-07-01T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                         = 3000-01-01T00:00Z
+
+# Spark job parameters
+spark_master                      = yarn
+spark_deploy                      = cluster
+spark_job_jar                     = 
${artifacts_directory}/org/wikimedia/analytics/refinery/refinery-job-0.0.32.jar
+spark_job_class                   = 
org.wikimedia.analytics.refinery.job.PageviewToJSON
+spark_job_name                    = PageviewToJson
+spark_executor_memory             = 2G
+spark_driver_memory               = 4G
+# Additional spark files (either on hdfs or on every worker)
+spark_assembly_jar                = 
${name_node}/user/spark/share/lib/spark-assembly.jar
+spark_additional_jars             = 
/usr/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/hive/lib/datanucleus-core-3.2.10.jar,/usr/lib/hive/lib/datanucleus-rdbms-3.2.9.jar
+spark_additional_files            = 
/etc/hive/conf.analytics-hadoop/hive-site.xml
+spark_output_partitions           = 8
+
+# Temporary directory
+temporary_directory               = ${name_node}/tmp
+
+# HDFS path to template to use.
+druid_template_file               = 
${oozie_directory}/pageview/druid/daily/load_pageview_daily.json.template
+
+
+# HDFS path to workflow to load druid
+load_druid_workflow_file          = 
${oozie_directory}/util/load_druid/workflow.xml
+# HDFS path to workflow to mark a directory as done
+mark_directory_done_workflow_file = 
${oozie_directory}/util/mark_directory_done/workflow.xml
+# Workflow to send an error email
+send_error_email_workflow_file    = 
${oozie_directory}/util/send_error_email/workflow.xml
+
+# Coordinator to start.
+oozie.coord.application.path      = ${coordinator_file}
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
+
+
diff --git a/oozie/pageview/druid/daily/coordinator.xml 
b/oozie/pageview/druid/daily/coordinator.xml
new file mode 100644
index 0000000..33d62e2
--- /dev/null
+++ b/oozie/pageview/druid/daily/coordinator.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+    name="pageview-druid-daily-coord"
+    frequency="${frequency}"
+    start="${start_time}"
+    end="${stop_time}"
+    timezone="Universal">
+
+    <parameters>
+
+        <!-- Required properties. -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property><name>workflow_file</name></property>
+        <property><name>pageview_datasets_file</name></property>
+        <property><name>pageview_data_directory</name></property>
+        <property><name>pageview_table</name></property>
+
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+
+        <property><name>spark_master</name></property>
+        <property><name>spark_deploy</name></property>
+        <property><name>spark_assembly_jar</name></property>
+        <property><name>spark_additional_jars</name></property>
+        <property><name>spark_additional_files</name></property>
+        <property><name>spark_job_jar</name></property>
+        <property><name>spark_job_class</name></property>
+        <property><name>spark_job_name</name></property>
+        <property><name>spark_executor_memory</name></property>
+        <property><name>spark_driver_memory</name></property>
+        <property><name>spark_output_partitions</name></property>
+
+        <property><name>druid_template_file</name></property>
+
+        <property><name>temporary_directory</name></property>
+
+        <property><name>load_druid_workflow_file</name></property>
+        <property><name>mark_directory_done_workflow_file</name></property>
+        <property><name>send_error_email_workflow_file</name></property>
+
+    </parameters>
+
+    <controls>
+        <!--(timeout is measured in minutes)-->
+        <timeout>-1</timeout>
+
+        <!-- Setting low concurrency cause the job is hungry in resources -->
+        <concurrency>1</concurrency>
+
+        <throttle>2</throttle>
+
+    </controls>
+
+    <datasets>
+        <include>${pageview_datasets_file}</include>
+    </datasets>
+
+    <input-events>
+        <data-in name="pageview_hourly_input" dataset="pageview_hourly">
+            <start-instance>${coord:current(0)}</start-instance>
+            <end-instance>${coord:current(23)}</end-instance>
+        </data-in>
+    </input-events>
+
+    <action>
+        <workflow>
+            <app-path>${workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>day</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"d")}</value>
+                </property>
+                <property>
+                    <name>start_date</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"yyyy-MM-dd")}</value>
+                </property>
+                <property>
+                    <name>end_date</name>
+                    
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 1, "DAY"), 
"yyyy-MM-dd")}</value>
+                </property>
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>
\ No newline at end of file
diff --git a/oozie/pageview/druid/daily/load_pageview_daily.json.template 
b/oozie/pageview/druid/daily/load_pageview_daily.json.template
new file mode 100644
index 0000000..4caddc7
--- /dev/null
+++ b/oozie/pageview/druid/daily/load_pageview_daily.json.template
@@ -0,0 +1,72 @@
+{
+  "type" : "index_hadoop",
+  "spec" : {
+    "ioConfig" : {
+      "type" : "hadoop",
+      "inputSpec" : {
+        "type" : "static",
+        "paths" : "*INPUT_PATH*"
+      }
+    },
+    "dataSchema" : {
+      "dataSource" : "pageviews-hourly",
+      "granularitySpec" : {
+        "type" : "uniform",
+        "segmentGranularity" : "day",
+        "queryGranularity" : "hour",
+        "intervals" : *INTERVALS_ARRAY*
+      },
+      "parser" : {
+        "type" : "string",
+        "parseSpec" : {
+          "format" : "json",
+          "dimensionsSpec" : {
+            "dimensions" : [
+                "project",
+                "language_variant",
+                "access_method",
+                "agent_type",
+                "referer_class",
+                "continent",
+                "country_code",
+                "country",
+                "subdivision",
+                "city",
+                "ua_device_family",
+                "ua_browser_family",
+                "ua_browser_major",
+                "ua_os_family",
+                "ua_os_major",
+                "ua_os_minor",
+                "ua_wmf_app_version"
+            ]
+          },
+          "timestampSpec" : {
+            "format" : "auto",
+            "column" : "ts"
+          }
+        }
+      },
+      "metricsSpec" : [
+        {
+          "name" : "view_count",
+          "type" : "doubleSum",
+          "fieldName": "view_count"
+        }
+      ]
+    },
+    "tuningConfig" : {
+      "type" : "hadoop",
+      "ignoreInvalidRows" : false,
+      "partitionsSpec" : {
+        "type" : "hashed",
+        "numShards" : 8
+      },
+      "jobProperties" : {
+        "mapreduce.reduce.memory.mb" : "8192",
+        "mapreduce.output.fileoutputformat.compress": 
"org.apache.hadoop.io.compress.GzipCodec"
+      }
+    }
+  }
+}
+
diff --git a/oozie/pageview/druid/daily/workflow.xml 
b/oozie/pageview/druid/daily/workflow.xml
new file mode 100644
index 0000000..03d92f5
--- /dev/null
+++ b/oozie/pageview/druid/daily/workflow.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="pageview-druid-daily-wf-${start_date}">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>256</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property>
+            <name>pageview_table</name>
+            <description>The hive pageview table to use</description>
+        </property>
+
+        <property>
+            <name>spark_master</name>
+            <description>Master to be used for Spark (yarn, local, 
other)</description>
+        </property>
+        <property>
+            <name>spark_deploy</name>
+            <description>Master to be used for Spark (yarn, local, 
other)</description>
+        </property>
+        <property>
+            <name>spark_additional_jars</name>
+            <description>Additional jars to use for the job (--jars spark 
parameter)</description>
+        </property>
+        <property>
+            <name>spark_additional_files</name>
+            <description>Additional files to use for the job (--files 
parameter)</description>
+        </property>
+        <property>
+            <name>spark_job_jar</name>
+            <description>Path to the jar to be used to run spark 
job</description>
+        </property>
+        <property>
+            <name>spark_job_class</name>
+            <description>Class of the spark job to be run</description>
+        </property>
+        <property>
+            <name>spark_job_name</name>
+            <description>Base name for the spark job to be run</description>
+        </property>
+        <property>
+            <name>spark_executor_memory</name>
+            <description>Memory to allocate for each spark 
executor</description>
+        </property>
+        <property>
+            <name>spark_driver_memory</name>
+            <description>Memory to allocate for spark driver 
process</description>
+        </property>
+        <property>
+            <name>spark_output_partitions</name>
+            <description>Number of json files generated</description>
+        </property>
+
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>The partition's day</description>
+        </property>
+        <property>
+            <name>start_date</name>
+            <description>The date of the data used (yyyy-MM-dd)</description>
+        </property>
+        <property>
+            <name>end_date</name>
+            <description>start_date + 1 day (yyyy-MM-dd)</description>
+        </property>
+        <property>
+            <name>druid_template_file</name>
+            <description>File to use as a template to define druid loading 
(absolute since used by load_druid sub-workflow)</description>
+        </property>
+        <property>
+            <name>temporary_directory</name>
+            <description>A directory in HDFS for temporary files</description>
+        </property>
+        <property>
+            <name>load_druid_workflow_file</name>
+            <description>Workflow for loading druid</description>
+        </property>
+        <property>
+            <name>mark_directory_done_workflow_file</name>
+            <description>Workflow for marking a directory done</description>
+        </property>
+        <property>
+            <name>send_error_email_workflow_file</name>
+            <description>Workflow for sending an email</description>
+        </property>
+    </parameters>
+
+    <start to="generate_json_pageview"/>
+
+    <action name="generate_json_pageview">
+        <spark xmlns="uri:oozie:spark-action:0.1">
+
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <configuration>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+            </configuration>
+            <master>${spark_master}</master>
+            <mode>${spark_deploy}</mode>
+            <name>${spark_job_name}-${year}-${month}-${day}</name>
+            <class>${spark_job_class}</class>
+            <jar>${spark_job_jar}</jar>
+            <spark-opts>--conf spark.dynamicAllocation.enabled=true --conf 
spark.shuffle.service.enabled=true --conf spark.yarn.jar=${spark_assembly_jar} 
--executor-memory ${spark_executor_memory} --driver-memory 
${spark_driver_memory} --num-executors ${spark_number_executors} --queue 
${queue_name} --jars ${spark_additional_jars} --files 
${spark_additional_files}</spark-opts>
+            <arg>--year</arg>
+            <arg>${year}</arg>
+            <arg>--month</arg>
+            <arg>${month}</arg>
+            <arg>--day</arg>
+            <arg>${day}</arg>
+            <arg>--pageview-table</arg>
+            <arg>${pageview_table}</arg>
+            <arg>--output-folder</arg>
+            
<arg>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</arg>
+            <arg>--num-partitions</arg>
+            <arg>${spark_output_partitions}</arg>
+            <arg>--granularity</arg>
+            <arg>hourly</arg>
+        </spark>
+        <ok to="mark_json_pageview_dataset_done" />
+        <error to="send_error_email" />
+    </action>
+
+    <action name="mark_json_pageview_dataset_done">
+        <sub-workflow>
+            <app-path>${mark_directory_done_workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="index_druid"/>
+        <error to="send_error_email"/>
+    </action>
+
+
+    <action name="index_druid">
+        <sub-workflow>
+            <app-path>${archive_job_output_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>source_directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</value>
+                </property>
+                <property>
+                    <name>template_file</name>
+                    <value>${druid_template_file}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="remove_temporary_data"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="remove_temporary_data">
+        <fs>
+            <delete 
path="${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}"/>
+        </fs>
+        <ok to="end"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="send_error_email">
+        <sub-workflow>
+            <app-path>${send_error_email_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>parent_name</name>
+                    <value>${wf:name()}</value>
+                </property>
+                <property>
+                    <name>parent_failed_action</name>
+                    <value>${wf:lastErrorNode()}</value>
+                </property>
+                <property>
+                    <name>parent_error_code</name>
+                    <value>${wf:errorCode(wf:lastErrorNode())}</value>
+                </property>
+                <property>
+                    <name>parent_error_message</name>
+                    <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="kill"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
diff --git a/oozie/util/load_druid/druid_loader 
b/oozie/util/load_druid/druid_loader
new file mode 100755
index 0000000..7f8f6d6
--- /dev/null
+++ b/oozie/util/load_druid/druid_loader
@@ -0,0 +1,112 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import logging
+import requests
+import datetime
+import time
+
+
+logger = logging.getLogger(__name__)
+
+
+class DruidLoader(object):
+
+    _INTERVALS_ARRAY_FIELD = '*INTERVALS_ARRAY*'
+    _INPUT_PATH_FIELD = '*INPUT_PATH*'
+
+    _LAUNCH_TASK_PATH = '/druid/indexer/v1/task'
+    _CHECK_TASK_PATH = '/druid/indexer/v1/task/{0}/status'
+
+    _STATUS_RUNNING = 'RUNNING'
+    _STATUS_FAILED = 'FAILED'
+    _STATUS_SUCCEEDED = 'SUCCEEDED'
+
+    def __init__(self, template_path, data_path, period,
+                 host='http://druid1001.eqiad.wmnet:8090', sleep=10):
+        self.template_path = template_path
+        self.data_path = data_path
+        self.period = period
+        self.host = host
+        self.sleep = sleep
+        self._init_json()
+
+    def _init_json(self):
+        template = open(self.template_path, 'r').read()
+        intervals_array = '["{0}"]'.format(self.period)
+        self.json = (template.
+                     replace(self._INPUT_PATH_FIELD, self.data_path).
+                     replace(self._INTERVALS_ARRAY_FIELD, intervals_array))
+        logger.debug('Json to be sent:\n{0}'.format(self.json))
+
+    def _start(self):
+        url = self.host + self._LAUNCH_TASK_PATH
+        headers = {'Content-type': 'application/json'}
+        req = requests.post(url, data=self.json, headers=headers)
+        if req.status_code == requests.codes.ok:
+            self.task_id = req.json()['task']
+            logger.debug('Indexation launched using url {0}'.format(url))
+        else:
+            raise RuntimeError('Druid indexation start returned bad status')
+
+    def _update_status(self):
+        url = self.host + self._CHECK_TASK_PATH.format(self.task_id)
+        req = requests.get(url)
+        if req.status_code == requests.codes.ok:
+            self.current_status = req.json()['status']['status']
+            logger.debug('Indexation status update to {0}'.format(
+                self.current_status))
+        else:
+            raise RuntimeError('Druid indexation check returned bad status')
+
+    def execute(self):
+        try:
+            self._start()
+            self._update_status()
+            while self.current_status == self._STATUS_RUNNING:
+                time.sleep(self.sleep)
+                self._update_status()
+            if self.current_status == self._STATUS_SUCCEEDED:
+                return 0
+            if self.current_status == self._STATUS_FAILED:
+                return 1
+        except Exception as e:
+            logger.error("An error occured:" + str(e))
+            return 1
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description='Launch a synchronous druid indexation for a day.')
+
+    parser.add_argument('template',
+                        help='The druid indexation json template path')
+    parser.add_argument('data', help='The druid indexation json data path')
+    parser.add_argument('period', help='The druid indexation period ' +
+                        '(YYYY-MM-DD/YYY-MM-DD) format')
+    parser.add_argument('--overlord',
+                        default='http://druid1001.eqiad.wmnet:8090',
+                        help='The druid overlord url (defaults to ' +
+                             'http://druid1001.eqiad.wmnet:8090)')
+    parser.add_argument('--debug', action='store_true',
+                        help='Log debugging messages')
+
+    args = parser.parse_args()
+
+    logging.basicConfig(level=(logging.DEBUG if args.debug else logging.INFO))
+    loader = DruidLoader(args.template, args.data, args.period,
+                         host=args.overlord)
+    loader.execute()
diff --git a/oozie/util/load_druid/workflow.xml 
b/oozie/util/load_druid/workflow.xml
new file mode 100644
index 0000000..c1da801
--- /dev/null
+++ b/oozie/util/load_druid/workflow.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="load-druid-wf">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>256</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>queue_name</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>done_file</name>
+            <value>_SUCCESS</value>
+            <description>
+                The name of the file to flag a directory as “done”.
+            </description>
+        <property>
+            <name>template_file</name>
+            <description>
+                File path for the json template to use.
+            </description>
+        </property>
+        <property>
+            <name>source_directory</name>
+            <description>
+                Directory in hdfs that contains the data that should
+                be loaded in druid.
+            </description>
+        </property>
+        <property>
+            <name>loaded_time_interval</name>
+            <description>
+                Time interval (yyyy-MM-dd/yyyy-M-dd format) of the loaded data.
+            </description>
+        </property>
+    </parameters>
+
+
+    <start to="check_source_directory"/>
+
+    <decision name="check_source_directory">
+        <switch>
+            <case to="source_directory_does_not_exist">
+                ${not fs:exists(source_directory)}
+            </case>
+            <case to="source_directory_is_not_a_directory">
+                ${not fs:isDir(source_directory)}
+            </case>
+            <case to="source_directory_is_not_done">
+                ${not 
fs:exists(concat(concat(source_directory,'/'),done_file))}
+            </case>
+            <default to="run_druid_loading"/>
+        </switch>
+    </decision>
+
+    <action name="run_druid_loading">
+        <!--
+        Druid loading is done through a python script that first launches the
+        task (POST call) the periodically polls (GET calls) for status until
+        success or failure.
+         -->
+        <shell xmlns="uri:oozie:shell-action:0.1">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <configuration>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queue_name}</value>
+                </property>
+            </configuration>
+            <exec>druid_loader</exec>
+            <argument>template_file</argument>
+            <argument>${source_directory}</argument>
+            <argument>${done_file}</argument>
+            <argument>loaded_time_interval</argument>
+            <file>druid_loader#druid_loader</file>
+            <file>${template_file}#template_file</file>
+        </shell>
+        <ok to="end"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="source_directory_does_not_exist">
+        <message>The job output directory ${source_directory} does not 
exist</message>
+    </kill>
+
+    <kill name="source_directory_is_not_a_directory">
+        <message>The given source_directory ${source_directory} is not a 
directory</message>
+    </kill>
+
+    <kill name="source_directory_is_not_done">
+        <message>The job output directory ${source_directory} lacks the 
${done_file} marker</message>
+    </kill>
+
+    <kill name="kill">
+        <message>error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/298131
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I32c800b6d95836031380773d3cb49d6dba6b6a03
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to