Joal has uploaded a new change for review. https://gerrit.wikimedia.org/r/298131
Change subject: [WIP] Add Druid loading daily oozie job ...................................................................... [WIP] Add Druid loading daily oozie job Change-Id: I32c800b6d95836031380773d3cb49d6dba6b6a03 --- A oozie/pageview/druid/README.md A oozie/pageview/druid/bundle.xml A oozie/pageview/druid/daily/coordinator.properties A oozie/pageview/druid/daily/coordinator.xml A oozie/pageview/druid/daily/load_pageview_daily.json.template A oozie/pageview/druid/daily/workflow.xml A oozie/util/load_druid/druid_loader A oozie/util/load_druid/workflow.xml 8 files changed, 786 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery refs/changes/31/298131/1 diff --git a/oozie/pageview/druid/README.md b/oozie/pageview/druid/README.md new file mode 100644 index 0000000..611836a --- /dev/null +++ b/oozie/pageview/druid/README.md @@ -0,0 +1,16 @@ +Oozie jobs to schedule importing pageviews in druid. +Daily and monthly folders contain coordinator to load in druid: + - hourly detailed pageviews, every day (daily folder) + - daily detailed pageviews, every month (monthly folder) + +The workflow launches the PageviewToJSON spark action, +then launches the druid indexation using the generated data. +The script waits for those indexation to finish, +then delete the json files. + +Example command for running the coordinator on command line: + + oozie job -run \ + -config daily/coordinator.properties \ + -D refinery_directory=hdfs://analytics-hadoop/wmf/refinery/current \ + -D spark_job_jar=hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-job.jar diff --git a/oozie/pageview/druid/bundle.xml b/oozie/pageview/druid/bundle.xml new file mode 100644 index 0000000..7d90782 --- /dev/null +++ b/oozie/pageview/druid/bundle.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<bundle-app xmlns="uri:oozie:bundle:0.2" + name="pageview-druid-test-bundle"> + + <parameters> + + <!-- Required properties --> + <property><name>queue_name</name></property> + <property><name>coordinator_file</name></property> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + <property><name>workflow_file</name></property> + <property><name>start_time</name></property> + <property><name>stop_time</name></property> + <property><name>pageview_datasets_file</name></property> + <property><name>pageview_data_directory</name></property> + <property><name>pageview_table</name></property> + + <property><name>spark_master</name></property> + <property><name>spark_job_jar</name></property> + <property><name>spark_job_class</name></property> + <property><name>spark_executor_memory</name></property> + <property><name>spark_driver_memory</name></property> + + <property><name>send_error_email_workflow_file</name></property> + </parameters> + + <coordinator name="pageview-druid-coord-hour-daily-test" > + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>frequency</name> + <value>${"$"}{coord:days(1)}</value> + </property> + <property> + <name>data_end_instance</name> + <value>${"$"}{coord:current(23)}</value> + </property> + <property> + <name>date_offset</name> + <value>DAY</value> + </property> + </configuration> + </coordinator> + + <coordinator name="pageview-druid-coord-day-monthly-test"> + <app-path>${coordinator_file}</app-path> + <configuration> + <property> + <name>frequency</name> + <value>${"$"}{coord:months(1)}</value> + </property> + <property> + <name>data_end_instance</name> + <value>${"$"}{coord:current(coord:daysInMonth(0) * 24 - 1)}</value> + </property> + <property> + <name>date_offset</name> + <value>MONTH</value> + </property> + </configuration> + </coordinator> + +</bundle-app> diff --git a/oozie/pageview/druid/daily/coordinator.properties b/oozie/pageview/druid/daily/coordinator.properties new file mode 100644 index 0000000..d895071 --- /dev/null +++ b/oozie/pageview/druid/daily/coordinator.properties @@ -0,0 +1,80 @@ +# Configures a bundle to generate JSON pageviews and load them in druid. +# Any of the following properties are override-able with -D. +# Usage: +# oozie job -Dstart_time=2016-06-01T00:00Z -submit -config oozie/pageview/druid/bundle.properties +# +# NOTE: The $refinery_directory must be synced to HDFS so that all relevant +# .xml files exist there when this job is submitted. + +name_node = hdfs://analytics-hadoop +job_tracker = resourcemanager.analytics.eqiad.wmnet:8032 +queue_name = default + +#Default user +user = hdfs + +# Base path in HDFS to refinery. +# When submitting this job for production, you should override this to point directly at a deployed +# directory name, and not the 'symbolic' 'current' directory. E.g. /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07 +refinery_directory = ${name_node}/wmf/refinery/current + +# HDFS path to artifacts that will be used by this job. +# E.g. refinery-job.jar should exist here. +artifacts_directory = ${refinery_directory}/artifacts + +# Base path in HDFS to oozie files. +# Other files will be used relative to this path. +oozie_directory = ${refinery_directory}/oozie + +# HDFS path to coordinator to run. +coordinator_file = ${oozie_directory}/pageview/druid/daily/coordinator.xml +# HDFS path to workflow to run. +workflow_file = ${oozie_directory}/pageview/druid/daily/workflow.xml + +# HDFS path to pageview dataset definitions +pageview_datasets_file = ${oozie_directory}/pageview/datasets.xml +pageview_data_directory = ${name_node}/wmf/data/wmf/pageview + +# Pageview table name (used by spark job) +pageview_table = wmf.pageview_hourly + +# Initial import time of the webrequest dataset. +start_time = 2015-07-01T00:00Z + +# Time to stop running this coordinator. Year 3000 == never! +stop_time = 3000-01-01T00:00Z + +# Spark job parameters +spark_master = yarn +spark_deploy = cluster +spark_job_jar = ${artifacts_directory}/org/wikimedia/analytics/refinery/refinery-job-0.0.32.jar +spark_job_class = org.wikimedia.analytics.refinery.job.PageviewToJSON +spark_job_name = PageviewToJson +spark_executor_memory = 2G +spark_driver_memory = 4G +# Additional spark files (either on hdfs or on every worker) +spark_assembly_jar = ${name_node}/user/spark/share/lib/spark-assembly.jar +spark_additional_jars = /usr/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/hive/lib/datanucleus-core-3.2.10.jar,/usr/lib/hive/lib/datanucleus-rdbms-3.2.9.jar +spark_additional_files = /etc/hive/conf.analytics-hadoop/hive-site.xml +spark_output_partitions = 8 + +# Temporary directory +temporary_directory = ${name_node}/tmp + +# HDFS path to template to use. +druid_template_file = ${oozie_directory}/pageview/druid/daily/load_pageview_daily.json.template + + +# HDFS path to workflow to load druid +load_druid_workflow_file = ${oozie_directory}/util/load_druid/workflow.xml +# HDFS path to workflow to mark a directory as done +mark_directory_done_workflow_file = ${oozie_directory}/util/mark_directory_done/workflow.xml +# Workflow to send an error email +send_error_email_workflow_file = ${oozie_directory}/util/send_error_email/workflow.xml + +# Coordinator to start. +oozie.coord.application.path = ${coordinator_file} +oozie.use.system.libpath = true +oozie.action.external.stats.write = true + + diff --git a/oozie/pageview/druid/daily/coordinator.xml b/oozie/pageview/druid/daily/coordinator.xml new file mode 100644 index 0000000..33d62e2 --- /dev/null +++ b/oozie/pageview/druid/daily/coordinator.xml @@ -0,0 +1,95 @@ +<?xml version="1.0" encoding="UTF-8"?> +<coordinator-app xmlns="uri:oozie:coordinator:0.4" + name="pageview-druid-daily-coord" + frequency="${frequency}" + start="${start_time}" + end="${stop_time}" + timezone="Universal"> + + <parameters> + + <!-- Required properties. --> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + <property><name>queue_name</name></property> + + <property><name>workflow_file</name></property> + <property><name>pageview_datasets_file</name></property> + <property><name>pageview_data_directory</name></property> + <property><name>pageview_table</name></property> + + <property><name>start_time</name></property> + <property><name>stop_time</name></property> + + <property><name>spark_master</name></property> + <property><name>spark_deploy</name></property> + <property><name>spark_assembly_jar</name></property> + <property><name>spark_additional_jars</name></property> + <property><name>spark_additional_files</name></property> + <property><name>spark_job_jar</name></property> + <property><name>spark_job_class</name></property> + <property><name>spark_job_name</name></property> + <property><name>spark_executor_memory</name></property> + <property><name>spark_driver_memory</name></property> + <property><name>spark_output_partitions</name></property> + + <property><name>druid_template_file</name></property> + + <property><name>temporary_directory</name></property> + + <property><name>load_druid_workflow_file</name></property> + <property><name>mark_directory_done_workflow_file</name></property> + <property><name>send_error_email_workflow_file</name></property> + + </parameters> + + <controls> + <!--(timeout is measured in minutes)--> + <timeout>-1</timeout> + + <!-- Setting low concurrency cause the job is hungry in resources --> + <concurrency>1</concurrency> + + <throttle>2</throttle> + + </controls> + + <datasets> + <include>${pageview_datasets_file}</include> + </datasets> + + <input-events> + <data-in name="pageview_hourly_input" dataset="pageview_hourly"> + <start-instance>${coord:current(0)}</start-instance> + <end-instance>${coord:current(23)}</end-instance> + </data-in> + </input-events> + + <action> + <workflow> + <app-path>${workflow_file}</app-path> + <configuration> + <property> + <name>year</name> + <value>${coord:formatTime(coord:nominalTime(), "y")}</value> + </property> + <property> + <name>month</name> + <value>${coord:formatTime(coord:nominalTime(), "M")}</value> + </property> + <property> + <name>day</name> + <value>${coord:formatTime(coord:nominalTime(), "d")}</value> + </property> + <property> + <name>start_date</name> + <value>${coord:formatTime(coord:nominalTime(), "yyyy-MM-dd")}</value> + </property> + <property> + <name>end_date</name> + <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 1, "DAY"), "yyyy-MM-dd")}</value> + </property> + </configuration> + </workflow> + </action> +</coordinator-app> \ No newline at end of file diff --git a/oozie/pageview/druid/daily/load_pageview_daily.json.template b/oozie/pageview/druid/daily/load_pageview_daily.json.template new file mode 100644 index 0000000..4caddc7 --- /dev/null +++ b/oozie/pageview/druid/daily/load_pageview_daily.json.template @@ -0,0 +1,72 @@ +{ + "type" : "index_hadoop", + "spec" : { + "ioConfig" : { + "type" : "hadoop", + "inputSpec" : { + "type" : "static", + "paths" : "*INPUT_PATH*" + } + }, + "dataSchema" : { + "dataSource" : "pageviews-hourly", + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "day", + "queryGranularity" : "hour", + "intervals" : *INTERVALS_ARRAY* + }, + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "project", + "language_variant", + "access_method", + "agent_type", + "referer_class", + "continent", + "country_code", + "country", + "subdivision", + "city", + "ua_device_family", + "ua_browser_family", + "ua_browser_major", + "ua_os_family", + "ua_os_major", + "ua_os_minor", + "ua_wmf_app_version" + ] + }, + "timestampSpec" : { + "format" : "auto", + "column" : "ts" + } + } + }, + "metricsSpec" : [ + { + "name" : "view_count", + "type" : "doubleSum", + "fieldName": "view_count" + } + ] + }, + "tuningConfig" : { + "type" : "hadoop", + "ignoreInvalidRows" : false, + "partitionsSpec" : { + "type" : "hashed", + "numShards" : 8 + }, + "jobProperties" : { + "mapreduce.reduce.memory.mb" : "8192", + "mapreduce.output.fileoutputformat.compress": "org.apache.hadoop.io.compress.GzipCodec" + } + } + } +} + diff --git a/oozie/pageview/druid/daily/workflow.xml b/oozie/pageview/druid/daily/workflow.xml new file mode 100644 index 0000000..03d92f5 --- /dev/null +++ b/oozie/pageview/druid/daily/workflow.xml @@ -0,0 +1,226 @@ +<?xml version="1.0" encoding="UTF-8"?> +<workflow-app xmlns="uri:oozie:workflow:0.4" + name="pageview-druid-daily-wf-${start_date}"> + + <parameters> + + <!-- Default values for inner oozie settings --> + <property> + <name>oozie_launcher_queue_name</name> + <value>${queue_name}</value> + </property> + <property> + <name>oozie_launcher_memory</name> + <value>256</value> + </property> + + <!-- Required properties --> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + <property><name>queue_name</name></property> + + <property> + <name>pageview_table</name> + <description>The hive pageview table to use</description> + </property> + + <property> + <name>spark_master</name> + <description>Master to be used for Spark (yarn, local, other)</description> + </property> + <property> + <name>spark_deploy</name> + <description>Master to be used for Spark (yarn, local, other)</description> + </property> + <property> + <name>spark_additional_jars</name> + <description>Additional jars to use for the job (--jars spark parameter)</description> + </property> + <property> + <name>spark_additional_files</name> + <description>Additional files to use for the job (--files parameter)</description> + </property> + <property> + <name>spark_job_jar</name> + <description>Path to the jar to be used to run spark job</description> + </property> + <property> + <name>spark_job_class</name> + <description>Class of the spark job to be run</description> + </property> + <property> + <name>spark_job_name</name> + <description>Base name for the spark job to be run</description> + </property> + <property> + <name>spark_executor_memory</name> + <description>Memory to allocate for each spark executor</description> + </property> + <property> + <name>spark_driver_memory</name> + <description>Memory to allocate for spark driver process</description> + </property> + <property> + <name>spark_output_partitions</name> + <description>Number of json files generated</description> + </property> + + <property> + <name>year</name> + <description>The partition's year</description> + </property> + <property> + <name>month</name> + <description>The partition's month</description> + </property> + <property> + <name>day</name> + <description>The partition's day</description> + </property> + <property> + <name>start_date</name> + <description>The date of the data used (yyyy-MM-dd)</description> + </property> + <property> + <name>end_date</name> + <description>start_date + 1 day (yyyy-MM-dd)</description> + </property> + <property> + <name>druid_template_file</name> + <description>File to use as a template to define druid loading (absolute since used by load_druid sub-workflow)</description> + </property> + <property> + <name>temporary_directory</name> + <description>A directory in HDFS for temporary files</description> + </property> + <property> + <name>load_druid_workflow_file</name> + <description>Workflow for loading druid</description> + </property> + <property> + <name>mark_directory_done_workflow_file</name> + <description>Workflow for marking a directory done</description> + </property> + <property> + <name>send_error_email_workflow_file</name> + <description>Workflow for sending an email</description> + </property> + </parameters> + + <start to="generate_json_pageview"/> + + <action name="generate_json_pageview"> + <spark xmlns="uri:oozie:spark-action:0.1"> + + <job-tracker>${job_tracker}</job-tracker> + <name-node>${name_node}</name-node> + <configuration> + <!--make sure oozie:launcher runs in a low priority queue --> + <property> + <name>oozie.launcher.mapred.job.queue.name</name> + <value>${oozie_launcher_queue_name}</value> + </property> + <property> + <name>oozie.launcher.mapreduce.map.memory.mb</name> + <value>${oozie_launcher_memory}</value> + </property> + </configuration> + <master>${spark_master}</master> + <mode>${spark_deploy}</mode> + <name>${spark_job_name}-${year}-${month}-${day}</name> + <class>${spark_job_class}</class> + <jar>${spark_job_jar}</jar> + <spark-opts>--conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.yarn.jar=${spark_assembly_jar} --executor-memory ${spark_executor_memory} --driver-memory ${spark_driver_memory} --num-executors ${spark_number_executors} --queue ${queue_name} --jars ${spark_additional_jars} --files ${spark_additional_files}</spark-opts> + <arg>--year</arg> + <arg>${year}</arg> + <arg>--month</arg> + <arg>${month}</arg> + <arg>--day</arg> + <arg>${day}</arg> + <arg>--pageview-table</arg> + <arg>${pageview_table}</arg> + <arg>--output-folder</arg> + <arg>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</arg> + <arg>--num-partitions</arg> + <arg>${spark_output_partitions}</arg> + <arg>--granularity</arg> + <arg>hourly</arg> + </spark> + <ok to="mark_json_pageview_dataset_done" /> + <error to="send_error_email" /> + </action> + + <action name="mark_json_pageview_dataset_done"> + <sub-workflow> + <app-path>${mark_directory_done_workflow_file}</app-path> + <configuration> + <property> + <name>directory</name> + <value>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</value> + </property> + </configuration> + </sub-workflow> + <ok to="index_druid"/> + <error to="send_error_email"/> + </action> + + + <action name="index_druid"> + <sub-workflow> + <app-path>${archive_job_output_workflow_file}</app-path> + <propagate-configuration/> + <configuration> + <property> + <name>source_directory</name> + <value>${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}</value> + </property> + <property> + <name>template_file</name> + <value>${druid_template_file}</value> + </property> + </configuration> + </sub-workflow> + <ok to="remove_temporary_data"/> + <error to="send_error_email"/> + </action> + + <action name="remove_temporary_data"> + <fs> + <delete path="${temporary_directory}/${wf:id()}-${spark_job_name}-${year}-${month}-${day}"/> + </fs> + <ok to="end"/> + <error to="send_error_email"/> + </action> + + <action name="send_error_email"> + <sub-workflow> + <app-path>${send_error_email_workflow_file}</app-path> + <propagate-configuration/> + <configuration> + <property> + <name>parent_name</name> + <value>${wf:name()}</value> + </property> + <property> + <name>parent_failed_action</name> + <value>${wf:lastErrorNode()}</value> + </property> + <property> + <name>parent_error_code</name> + <value>${wf:errorCode(wf:lastErrorNode())}</value> + </property> + <property> + <name>parent_error_message</name> + <value>${wf:errorMessage(wf:lastErrorNode())}</value> + </property> + </configuration> + </sub-workflow> + <ok to="kill"/> + <error to="kill"/> + </action> + + <kill name="kill"> + <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> diff --git a/oozie/util/load_druid/druid_loader b/oozie/util/load_druid/druid_loader new file mode 100755 index 0000000..7f8f6d6 --- /dev/null +++ b/oozie/util/load_druid/druid_loader @@ -0,0 +1,112 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import requests +import datetime +import time + + +logger = logging.getLogger(__name__) + + +class DruidLoader(object): + + _INTERVALS_ARRAY_FIELD = '*INTERVALS_ARRAY*' + _INPUT_PATH_FIELD = '*INPUT_PATH*' + + _LAUNCH_TASK_PATH = '/druid/indexer/v1/task' + _CHECK_TASK_PATH = '/druid/indexer/v1/task/{0}/status' + + _STATUS_RUNNING = 'RUNNING' + _STATUS_FAILED = 'FAILED' + _STATUS_SUCCEEDED = 'SUCCEEDED' + + def __init__(self, template_path, data_path, period, + host='http://druid1001.eqiad.wmnet:8090', sleep=10): + self.template_path = template_path + self.data_path = data_path + self.period = period + self.host = host + self.sleep = sleep + self._init_json() + + def _init_json(self): + template = open(self.template_path, 'r').read() + intervals_array = '["{0}"]'.format(self.period) + self.json = (template. + replace(self._INPUT_PATH_FIELD, self.data_path). + replace(self._INTERVALS_ARRAY_FIELD, intervals_array)) + logger.debug('Json to be sent:\n{0}'.format(self.json)) + + def _start(self): + url = self.host + self._LAUNCH_TASK_PATH + headers = {'Content-type': 'application/json'} + req = requests.post(url, data=self.json, headers=headers) + if req.status_code == requests.codes.ok: + self.task_id = req.json()['task'] + logger.debug('Indexation launched using url {0}'.format(url)) + else: + raise RuntimeError('Druid indexation start returned bad status') + + def _update_status(self): + url = self.host + self._CHECK_TASK_PATH.format(self.task_id) + req = requests.get(url) + if req.status_code == requests.codes.ok: + self.current_status = req.json()['status']['status'] + logger.debug('Indexation status update to {0}'.format( + self.current_status)) + else: + raise RuntimeError('Druid indexation check returned bad status') + + def execute(self): + try: + self._start() + self._update_status() + while self.current_status == self._STATUS_RUNNING: + time.sleep(self.sleep) + self._update_status() + if self.current_status == self._STATUS_SUCCEEDED: + return 0 + if self.current_status == self._STATUS_FAILED: + return 1 + except Exception as e: + logger.error("An error occured:" + str(e)) + return 1 + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Launch a synchronous druid indexation for a day.') + + parser.add_argument('template', + help='The druid indexation json template path') + parser.add_argument('data', help='The druid indexation json data path') + parser.add_argument('period', help='The druid indexation period ' + + '(YYYY-MM-DD/YYY-MM-DD) format') + parser.add_argument('--overlord', + default='http://druid1001.eqiad.wmnet:8090', + help='The druid overlord url (defaults to ' + + 'http://druid1001.eqiad.wmnet:8090)') + parser.add_argument('--debug', action='store_true', + help='Log debugging messages') + + args = parser.parse_args() + + logging.basicConfig(level=(logging.DEBUG if args.debug else logging.INFO)) + loader = DruidLoader(args.template, args.data, args.period, + host=args.overlord) + loader.execute() diff --git a/oozie/util/load_druid/workflow.xml b/oozie/util/load_druid/workflow.xml new file mode 100644 index 0000000..c1da801 --- /dev/null +++ b/oozie/util/load_druid/workflow.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<workflow-app xmlns="uri:oozie:workflow:0.4" + name="load-druid-wf"> + + <parameters> + + <!-- Default values for inner oozie settings --> + <property> + <name>oozie_launcher_queue_name</name> + <value>${queue_name}</value> + </property> + <property> + <name>oozie_launcher_memory</name> + <value>256</value> + </property> + + <!-- Required properties --> + <property><name>queue_name</name></property> + <property><name>name_node</name></property> + <property><name>job_tracker</name></property> + + <property> + <name>done_file</name> + <value>_SUCCESS</value> + <description> + The name of the file to flag a directory as “done”. + </description> + <property> + <name>template_file</name> + <description> + File path for the json template to use. + </description> + </property> + <property> + <name>source_directory</name> + <description> + Directory in hdfs that contains the data that should + be loaded in druid. + </description> + </property> + <property> + <name>loaded_time_interval</name> + <description> + Time interval (yyyy-MM-dd/yyyy-M-dd format) of the loaded data. + </description> + </property> + </parameters> + + + <start to="check_source_directory"/> + + <decision name="check_source_directory"> + <switch> + <case to="source_directory_does_not_exist"> + ${not fs:exists(source_directory)} + </case> + <case to="source_directory_is_not_a_directory"> + ${not fs:isDir(source_directory)} + </case> + <case to="source_directory_is_not_done"> + ${not fs:exists(concat(concat(source_directory,'/'),done_file))} + </case> + <default to="run_druid_loading"/> + </switch> + </decision> + + <action name="run_druid_loading"> + <!-- + Druid loading is done through a python script that first launches the + task (POST call) the periodically polls (GET calls) for status until + success or failure. + --> + <shell xmlns="uri:oozie:shell-action:0.1"> + <job-tracker>${job_tracker}</job-tracker> + <name-node>${name_node}</name-node> + <configuration> + <!--make sure oozie:launcher runs in a low priority queue --> + <property> + <name>oozie.launcher.mapred.job.queue.name</name> + <value>${oozie_launcher_queue_name}</value> + </property> + <property> + <name>oozie.launcher.mapreduce.map.memory.mb</name> + <value>${oozie_launcher_memory}</value> + </property> + + <property> + <name>mapred.job.queue.name</name> + <value>${queue_name}</value> + </property> + </configuration> + <exec>druid_loader</exec> + <argument>template_file</argument> + <argument>${source_directory}</argument> + <argument>${done_file}</argument> + <argument>loaded_time_interval</argument> + <file>druid_loader#druid_loader</file> + <file>${template_file}#template_file</file> + </shell> + <ok to="end"/> + <error to="kill"/> + </action> + + <kill name="source_directory_does_not_exist"> + <message>The job output directory ${source_directory} does not exist</message> + </kill> + + <kill name="source_directory_is_not_a_directory"> + <message>The given source_directory ${source_directory} is not a directory</message> + </kill> + + <kill name="source_directory_is_not_done"> + <message>The job output directory ${source_directory} lacks the ${done_file} marker</message> + </kill> + + <kill name="kill"> + <message>error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> + +</workflow-app> -- To view, visit https://gerrit.wikimedia.org/r/298131 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I32c800b6d95836031380773d3cb49d6dba6b6a03 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits