Joal has uploaded a new change for review.
https://gerrit.wikimedia.org/r/270921
Change subject: Add monthly top to cassandra and correct jobs
......................................................................
Add monthly top to cassandra and correct jobs
Add monthly top cassandra load oozie job.
Remove number of reducers parameter constraint in other
cassandra loading jobs.
Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
---
M oozie/cassandra/bundle.xml
A oozie/cassandra/coord_top_articles_monthly.properties
M oozie/cassandra/daily/pageview_per_article.hql
M oozie/cassandra/daily/pageview_per_project.hql
M oozie/cassandra/daily/pageview_top_articles.hql
M oozie/cassandra/daily/workflow.xml
M oozie/cassandra/hourly/pageview_per_project.hql
M oozie/cassandra/hourly/workflow.xml
M oozie/cassandra/monthly/pageview_per_project.hql
A oozie/cassandra/monthly/pageview_top_articles.hql
M oozie/cassandra/monthly/workflow.xml
11 files changed, 258 insertions(+), 13 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery
refs/changes/21/270921/1
diff --git a/oozie/cassandra/bundle.xml b/oozie/cassandra/bundle.xml
index 074620b..928791f 100644
--- a/oozie/cassandra/bundle.xml
+++ b/oozie/cassandra/bundle.xml
@@ -513,4 +513,64 @@
</configuration>
</coordinator>
+ <coordinator name="cassandra_pageview_top_articles_monthly-coord">
+ <app-path>${coordinator_file_monthly}</app-path>
+ <configuration>
+ <property>
+ <name>workflow_file</name>
+ <value>${workflow_file_monthly}</value>
+ </property>
+
+ <!-- Dataset the job depends on -->
+ <property>
+ <name>datasets_file</name>
+ <value>${top_articles_datasets_file}</value>
+ </property>
+ <property>
+ <name>dataset_name</name>
+ <value>${top_articles_dataset_name}</value>
+ </property>
+
+ <!-- Hive data preparation job stuff -->
+ <property>
+ <name>hive_script</name>
+ <value>${top_articles_hive_script}</value>
+ </property>
+ <property>
+ <name>source_table</name>
+ <value>${top_articles_source_table}</value>
+ </property>
+
+ <!-- cassandra loader job stuff -->
+ <property>
+ <name>cassandra_parallel_loaders</name>
+ <value>${per_article_flat_parallel_loaders}</value>
+ </property>
+ <property>
+ <name>cassandra_cql</name>
+ <value>${top_articles_cql}</value>
+ </property>
+ <property>
+ <name>cassandra_keyspace</name>
+ <value>${top_articles_keyspace}</value>
+ </property>
+ <property>
+ <name>hive_fields</name>
+ <value>${top_articles_hive_fields}</value>
+ </property>
+ <property>
+ <name>hive_fields_types</name>
+ <value>${top_articles_hive_fields_types}</value>
+ </property>
+ <property>
+ <name>cassandra_fields</name>
+ <value>${top_articles_cassandra_fields}</value>
+ </property>
+ <property>
+ <name>cassandra_primary_keys</name>
+ <value>${top_articles_cassandra_primary_keys}</value>
+ </property>
+ </configuration>
+ </coordinator>
+
</bundle-app>
diff --git a/oozie/cassandra/coord_top_articles_monthly.properties
b/oozie/cassandra/coord_top_articles_monthly.properties
new file mode 100644
index 0000000..1af4e9f
--- /dev/null
+++ b/oozie/cassandra/coord_top_articles_monthly.properties
@@ -0,0 +1,96 @@
+# Configures a coordinator to manage loading cassandra for the top_articles
monthly
+# pageview API.Any of the following properties are overidable with -D.
+# Usage:
+# oozie job -Duser=$USER -Dstart_time=2015-05-05T00:00Z -submit -config
oozie/cassandra/coord_top_articles_monthly.properties
+#
+# NOTE: The $oozie_directory must be synced to HDFS so that all relevant
+# .xml files exist there when this job is submitted.
+
+
+name_node = hdfs://analytics-hadoop
+job_tracker = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name = default
+
+user = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g. /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory = ${name_node}/wmf/refinery/current
+
+# HDFS path to the refinery job jar that will be used by this job.
+refinery_cassandra_jar_path =
${refinery_directory}/artifacts/org/wikimedia/analytics/refinery/refinery-cassandra-0.0.23.jar
+cassandra_reducer_class =
org.wikimedia.analytics.refinery.cassandra.ReducerToCassandra
+cassandra_output_format_class =
org.wikimedia.analytics.refinery.cassandra.CqlOutputFormat
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory = ${refinery_directory}/oozie
+
+# HDFS path to coordinators to run.
+coordinator_file_hourly =
${oozie_directory}/cassandra/hourly/coordinator.xml
+coordinator_file_daily =
${oozie_directory}/cassandra/daily/coordinator.xml
+coordinator_file_monthly =
${oozie_directory}/cassandra/monthly/coordinator.xml
+
+# HDFS path to workflows to run.
+workflow_file_hourly =
${oozie_directory}/cassandra/hourly/workflow.xml
+workflow_file_daily =
${oozie_directory}/cassandra/daily/workflow.xml
+workflow_file_monthly =
${oozie_directory}/cassandra/monthly/workflow.xml
+
+
+# HDFS path to datasets definitions
+pageview_datasets_file = ${oozie_directory}/pageview/datasets.xml
+projectview_datasets_file = ${oozie_directory}/projectview/datasets.xml
+pageview_data_directory = ${name_node}/wmf/data/wmf/pageview
+projectview_data_directory = ${name_node}/wmf/data/wmf/projectview
+
+# Initial import time of the webrequest dataset.
+start_time = 2015-05-01T00:00Z
+
+# Time to stop running this coordinator. Year 3000 == never!
+stop_time = 3000-01-01T00:00Z
+
+# HDFS path to hive-site.xml file. This is needed to run hive actions.
+hive_site_xml =
${refinery_directory}/oozie/util/hive/hive-site.xml
+# Temporary directory
+temporary_directory = ${name_node}/tmp
+
+# Cassandra cluster info
+cassandra_host = aqs1001.eqiad.wmnet
+cassandra_port = 9042
+cassandra_username = cassandra
+cassandra_password = cassandra
+cassandra_nodes = 3
+batch_size = 1024
+
+# Hive value separator
+hive_value_separator = \\t
+# Cassandra table to be loaded (not job dependant)
+cassandra_table = data
+
+# Constant field names and value to be loaded into cassandra
+constant_output_domain_field = _domain
+constant_output_domain_value = analytics.wikimedia.org,text
+constant_output_granularity_field = granularity
+constant_output_tid_field = _tid
+constant_output_tid_value = 0,timeuuid
+
+workflow_file = ${workflow_file_monthly}
+datasets_file = ${pageview_datasets_file}
+dataset_name = pageview_hourly
+hive_script = pageview_top_articles.hql
+source_table = wmf.pageview_hourly
+cassandra_parallel_loaders = 3
+cassandra_cql = UPDATE
"local_group_default_T_top_pageviews"."data" SET "articlesJSON" = ?
+cassandra_keyspace = local_group_default_T_top_pageviews
+hive_fields = project,access,year,month,day,articlesJSON
+hive_fields_types = text,text,text,text,text,text
+cassandra_fields = articlesJSON
+cassandra_primary_keys = _domain,project,access,year,month,day,_tid
+
+# Coordintator to start.
+oozie.coord.application.path = ${coordinator_file_daily}
+oozie.use.system.libpath = true
+oozie.action.external.stats.write = true
diff --git a/oozie/cassandra/daily/pageview_per_article.hql
b/oozie/cassandra/daily/pageview_per_article.hql
index 60f15f9..97de211 100644
--- a/oozie/cassandra/daily/pageview_per_article.hql
+++ b/oozie/cassandra/daily/pageview_per_article.hql
@@ -14,13 +14,11 @@
-- -d year=2015 \
-- -d month=5 \
-- -d day=1 \
--- -d reducers_number=6 \
--
SET hive.exec.compress.output=true;
SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_per_project.hql
b/oozie/cassandra/daily/pageview_per_project.hql
index be08d2d..317fb31 100644
--- a/oozie/cassandra/daily/pageview_per_project.hql
+++ b/oozie/cassandra/daily/pageview_per_project.hql
@@ -14,13 +14,11 @@
-- -d year=2015 \
-- -d month=5 \
-- -d day=1 \
--- -d reducers_number=1 \
--
SET hive.exec.compress.output=true;
SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_top_articles.hql
b/oozie/cassandra/daily/pageview_top_articles.hql
index 72eefc4..a4d15b9 100644
--- a/oozie/cassandra/daily/pageview_top_articles.hql
+++ b/oozie/cassandra/daily/pageview_top_articles.hql
@@ -14,13 +14,11 @@
-- -d year=2015 \
-- -d month=5 \
-- -d day=1 \
--- -d reducers_number=6 \
--
SET hive.exec.compress.output=true;
SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
WITH ranked AS (
diff --git a/oozie/cassandra/daily/workflow.xml
b/oozie/cassandra/daily/workflow.xml
index adbad53..16a02cd 100644
--- a/oozie/cassandra/daily/workflow.xml
+++ b/oozie/cassandra/daily/workflow.xml
@@ -166,7 +166,6 @@
<param>year=${year}</param>
<param>month=${month}</param>
<param>day=${day}</param>
- <param>reducers_number=${cassandra_parallel_loaders}</param>
</hive>
<ok to="load_cassandra"/>
diff --git a/oozie/cassandra/hourly/pageview_per_project.hql
b/oozie/cassandra/hourly/pageview_per_project.hql
index 8b84ca8..1a2e4d6 100644
--- a/oozie/cassandra/hourly/pageview_per_project.hql
+++ b/oozie/cassandra/hourly/pageview_per_project.hql
@@ -16,13 +16,11 @@
-- -d month=5 \
-- -d day=1 \
-- -d hour=0 \
--- -d reducers_number=1 \
--
SET hive.exec.compress.output=true;
SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/hourly/workflow.xml
b/oozie/cassandra/hourly/workflow.xml
index 66c9c44..083580f 100644
--- a/oozie/cassandra/hourly/workflow.xml
+++ b/oozie/cassandra/hourly/workflow.xml
@@ -179,7 +179,6 @@
<param>month=${month}</param>
<param>day=${day}</param>
<param>hour=${hour}</param>
- <param>reducers_number=${cassandra_parallel_loaders}</param>
</hive>
<ok to="load_cassandra"/>
diff --git a/oozie/cassandra/monthly/pageview_per_project.hql
b/oozie/cassandra/monthly/pageview_per_project.hql
index b345264..c849db9 100644
--- a/oozie/cassandra/monthly/pageview_per_project.hql
+++ b/oozie/cassandra/monthly/pageview_per_project.hql
@@ -12,13 +12,11 @@
-- -d separator=\t \
-- -d year=2015 \
-- -d month=5 \
--- -d reducers_number=1 \
--
SET hive.exec.compress.output=true;
SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/monthly/pageview_top_articles.hql
b/oozie/cassandra/monthly/pageview_top_articles.hql
new file mode 100644
index 0000000..b81906d
--- /dev/null
+++ b/oozie/cassandra/monthly/pageview_top_articles.hql
@@ -0,0 +1,102 @@
+-- Parameters:
+-- destination_directory -- HDFS path to write output files
+-- source_table -- Fully qualified table name to compute from.
+-- separator -- Separator for values
+-- year -- year of partition to compute from.
+-- month -- month of partition to compute from.
+-- day -- day of partition to compute from.
+--
+-- Usage:
+-- hive -f pageview_top_articles.hql \
+-- -d destination_directory=/tmp/pageview_top_articles \
+-- -d source_table=wmf.pageview_hourly \
+-- -d separator=\t \
+-- -d year=2015 \
+-- -d month=5 \
+-- -d day=1 \
+--
+
+
+SET hive.exec.compress.output=true;
+SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+WITH ranked AS (
+ SELECT
+ project,
+ page_title,
+ access,
+ year,
+ month,
+ views,
+ rank() OVER (PARTITION BY project, access, year, month ORDER BY views
DESC) as rank,
+ row_number() OVER (PARTITION BY project, access, year, month ORDER BY
views DESC) as rn
+ FROM (
+ SELECT
+ project,
+ reflect("org.json.simple.JSONObject", "escape",
regexp_replace(page_title, '${separator}', '')) AS page_title,
+ COALESCE(regexp_replace(access_method, ' ', '-'), 'all-access') AS
access,
+ LPAD(year, 4, "0") as year,
+ LPAD(month, 2, "0") as month,
+ SUM(view_count) as views
+ FROM ${source_table}
+ WHERE
+ year = ${year}
+ AND month = ${month}
+ AND agent_type = 'user'
+ GROUP BY project, regexp_replace(page_title, '${separator}', ''),
access_method, year, month
+ GROUPING SETS (
+ (project, regexp_replace(page_title, '${separator}', ''),
access_method, year, month),
+ (project, regexp_replace(page_title, '${separator}', ''), year,
month)
+ )
+ ) raw
+),
+max_rank AS (
+ SELECT
+ project,
+ access,
+ year,
+ month,
+ rank as max_rank
+ FROM ranked
+ WHERE
+ rn = 1001
+ GROUP BY
+ project,
+ access,
+ year,
+ month,
+ rank
+)
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+-- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+-- works for exports to local directories (see HIVE-5672), we have to
+-- prepare the lines by hand through concatenation :-(
+SELECT
+ CONCAT_WS("${separator}",
+ ranked.project,
+ ranked.access,
+ ranked.year,
+ ranked.month,
+ 'all-days',
+ CONCAT('[',
+ CONCAT_WS(',', collect_set(
+ CONCAT('{"article":"', ranked.page_title,
+ '","views":', CAST(ranked.views AS STRING),
+ ',"rank":', CAST(ranked.rank AS STRING), '}'))
+ ),']')
+ )
+FROM ranked
+LEFT JOIN max_rank ON (
+ ranked.project = max_rank.project
+ AND ranked.access = max_rank.access
+ AND ranked.year = max_rank.year
+ AND ranked.month = max_rank.month
+)
+WHERE ranked.rank < COALESCE(max_rank.max_rank, 1001)
+GROUP BY
+ ranked.project,
+ ranked.access,
+ ranked.year,
+ ranked.month
+;
\ No newline at end of file
diff --git a/oozie/cassandra/monthly/workflow.xml
b/oozie/cassandra/monthly/workflow.xml
index cbf93eb..31d8b9d 100644
--- a/oozie/cassandra/monthly/workflow.xml
+++ b/oozie/cassandra/monthly/workflow.xml
@@ -161,7 +161,6 @@
<param>destination_directory=${temporary_directory}/${wf:id()}-${cassandra_keyspace}</param>
<param>year=${year}</param>
<param>month=${month}</param>
- <param>reducers_number=${cassandra_parallel_loaders}</param>
</hive>
<ok to="load_cassandra"/>
--
To view, visit https://gerrit.wikimedia.org/r/270921
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits