Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/270921

Change subject: Add monthly top to cassandra and correct jobs
......................................................................

Add monthly top to cassandra and correct jobs

Add monthly top cassandra load oozie job.
Remove number of reducers parameter constraint in other
cassandra loading jobs.

Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
---
M oozie/cassandra/bundle.xml
A oozie/cassandra/coord_top_articles_monthly.properties
M oozie/cassandra/daily/pageview_per_article.hql
M oozie/cassandra/daily/pageview_per_project.hql
M oozie/cassandra/daily/pageview_top_articles.hql
M oozie/cassandra/daily/workflow.xml
M oozie/cassandra/hourly/pageview_per_project.hql
M oozie/cassandra/hourly/workflow.xml
M oozie/cassandra/monthly/pageview_per_project.hql
A oozie/cassandra/monthly/pageview_top_articles.hql
M oozie/cassandra/monthly/workflow.xml
11 files changed, 258 insertions(+), 13 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/21/270921/1

diff --git a/oozie/cassandra/bundle.xml b/oozie/cassandra/bundle.xml
index 074620b..928791f 100644
--- a/oozie/cassandra/bundle.xml
+++ b/oozie/cassandra/bundle.xml
@@ -513,4 +513,64 @@
         </configuration>
     </coordinator>
 
+    <coordinator name="cassandra_pageview_top_articles_monthly-coord">
+        <app-path>${coordinator_file_monthly}</app-path>
+        <configuration>
+            <property>
+                <name>workflow_file</name>
+                <value>${workflow_file_monthly}</value>
+            </property>
+
+            <!-- Dataset the job depends on -->
+            <property>
+                <name>datasets_file</name>
+                <value>${top_articles_datasets_file}</value>
+            </property>
+            <property>
+                <name>dataset_name</name>
+                <value>${top_articles_dataset_name}</value>
+            </property>
+
+            <!-- Hive data preparation job stuff -->
+            <property>
+                <name>hive_script</name>
+                <value>${top_articles_hive_script}</value>
+            </property>
+            <property>
+                <name>source_table</name>
+                <value>${top_articles_source_table}</value>
+            </property>
+
+            <!-- cassandra loader job stuff -->
+            <property>
+                <name>cassandra_parallel_loaders</name>
+                <value>${per_article_flat_parallel_loaders}</value>
+            </property>
+            <property>
+                <name>cassandra_cql</name>
+                <value>${top_articles_cql}</value>
+            </property>
+            <property>
+                <name>cassandra_keyspace</name>
+                <value>${top_articles_keyspace}</value>
+            </property>
+            <property>
+                <name>hive_fields</name>
+                <value>${top_articles_hive_fields}</value>
+            </property>
+            <property>
+                <name>hive_fields_types</name>
+                <value>${top_articles_hive_fields_types}</value>
+            </property>
+            <property>
+                <name>cassandra_fields</name>
+                <value>${top_articles_cassandra_fields}</value>
+            </property>
+            <property>
+                <name>cassandra_primary_keys</name>
+                <value>${top_articles_cassandra_primary_keys}</value>
+            </property>
+        </configuration>
+    </coordinator>
+
 </bundle-app>
diff --git a/oozie/cassandra/coord_top_articles_monthly.properties 
b/oozie/cassandra/coord_top_articles_monthly.properties
new file mode 100644
index 0000000..1af4e9f
--- /dev/null
+++ b/oozie/cassandra/coord_top_articles_monthly.properties
@@ -0,0 +1,96 @@
+# Configures a coordinator to manage loading cassandra for the top_articles 
monthly
+# pageview API.Any of the following properties are overidable with -D.
+# Usage:
+#   oozie job -Duser=$USER -Dstart_time=2015-05-05T00:00Z -submit -config 
oozie/cassandra/coord_top_articles_monthly.properties
+#
+# NOTE:  The $oozie_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+user                              = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g.  /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                = ${name_node}/wmf/refinery/current
+
+# HDFS path to the refinery job jar that will be used by this job.
+refinery_cassandra_jar_path       = 
${refinery_directory}/artifacts/org/wikimedia/analytics/refinery/refinery-cassandra-0.0.23.jar
+cassandra_reducer_class           = 
org.wikimedia.analytics.refinery.cassandra.ReducerToCassandra
+cassandra_output_format_class     = 
org.wikimedia.analytics.refinery.cassandra.CqlOutputFormat
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                   = ${refinery_directory}/oozie
+
+# HDFS path to coordinators to run.
+coordinator_file_hourly           = 
${oozie_directory}/cassandra/hourly/coordinator.xml
+coordinator_file_daily            = 
${oozie_directory}/cassandra/daily/coordinator.xml
+coordinator_file_monthly          = 
${oozie_directory}/cassandra/monthly/coordinator.xml
+
+# HDFS path to workflows to run.
+workflow_file_hourly              = 
${oozie_directory}/cassandra/hourly/workflow.xml
+workflow_file_daily               = 
${oozie_directory}/cassandra/daily/workflow.xml
+workflow_file_monthly             = 
${oozie_directory}/cassandra/monthly/workflow.xml
+
+
+# HDFS path to datasets definitions
+pageview_datasets_file            = ${oozie_directory}/pageview/datasets.xml
+projectview_datasets_file         = ${oozie_directory}/projectview/datasets.xml
+pageview_data_directory           = ${name_node}/wmf/data/wmf/pageview
+projectview_data_directory        = ${name_node}/wmf/data/wmf/projectview
+
+# Initial import time of the webrequest dataset.
+start_time                        = 2015-05-01T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                         = 3000-01-01T00:00Z
+
+# HDFS path to hive-site.xml file.  This is needed to run hive actions.
+hive_site_xml                     = 
${refinery_directory}/oozie/util/hive/hive-site.xml
+# Temporary directory
+temporary_directory               = ${name_node}/tmp
+
+# Cassandra cluster info
+cassandra_host                    = aqs1001.eqiad.wmnet
+cassandra_port                    = 9042
+cassandra_username                = cassandra
+cassandra_password                = cassandra
+cassandra_nodes                   = 3
+batch_size                        = 1024
+
+# Hive value separator
+hive_value_separator              = \\t
+# Cassandra table to be loaded (not job dependant)
+cassandra_table                   = data
+
+# Constant field names and value to be loaded into cassandra
+constant_output_domain_field      = _domain
+constant_output_domain_value      = analytics.wikimedia.org,text
+constant_output_granularity_field = granularity
+constant_output_tid_field         = _tid
+constant_output_tid_value         = 0,timeuuid
+
+workflow_file                     = ${workflow_file_monthly}
+datasets_file                     = ${pageview_datasets_file}
+dataset_name                      = pageview_hourly
+hive_script                       = pageview_top_articles.hql
+source_table                      = wmf.pageview_hourly
+cassandra_parallel_loaders        = 3
+cassandra_cql                     = UPDATE 
"local_group_default_T_top_pageviews"."data" SET "articlesJSON" = ?
+cassandra_keyspace                = local_group_default_T_top_pageviews
+hive_fields                       = project,access,year,month,day,articlesJSON
+hive_fields_types                 = text,text,text,text,text,text
+cassandra_fields                  = articlesJSON
+cassandra_primary_keys            = _domain,project,access,year,month,day,_tid
+
+# Coordintator to start.
+oozie.coord.application.path     = ${coordinator_file_daily}
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
diff --git a/oozie/cassandra/daily/pageview_per_article.hql 
b/oozie/cassandra/daily/pageview_per_article.hql
index 60f15f9..97de211 100644
--- a/oozie/cassandra/daily/pageview_per_article.hql
+++ b/oozie/cassandra/daily/pageview_per_article.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=6                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_per_project.hql 
b/oozie/cassandra/daily/pageview_per_project.hql
index be08d2d..317fb31 100644
--- a/oozie/cassandra/daily/pageview_per_project.hql
+++ b/oozie/cassandra/daily/pageview_per_project.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_top_articles.hql 
b/oozie/cassandra/daily/pageview_top_articles.hql
index 72eefc4..a4d15b9 100644
--- a/oozie/cassandra/daily/pageview_top_articles.hql
+++ b/oozie/cassandra/daily/pageview_top_articles.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=6                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 WITH ranked AS (
diff --git a/oozie/cassandra/daily/workflow.xml 
b/oozie/cassandra/daily/workflow.xml
index adbad53..16a02cd 100644
--- a/oozie/cassandra/daily/workflow.xml
+++ b/oozie/cassandra/daily/workflow.xml
@@ -166,7 +166,6 @@
             <param>year=${year}</param>
             <param>month=${month}</param>
             <param>day=${day}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>
diff --git a/oozie/cassandra/hourly/pageview_per_project.hql 
b/oozie/cassandra/hourly/pageview_per_project.hql
index 8b84ca8..1a2e4d6 100644
--- a/oozie/cassandra/hourly/pageview_per_project.hql
+++ b/oozie/cassandra/hourly/pageview_per_project.hql
@@ -16,13 +16,11 @@
 --         -d month=5                                             \
 --         -d day=1                                               \
 --         -d hour=0                                              \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/hourly/workflow.xml 
b/oozie/cassandra/hourly/workflow.xml
index 66c9c44..083580f 100644
--- a/oozie/cassandra/hourly/workflow.xml
+++ b/oozie/cassandra/hourly/workflow.xml
@@ -179,7 +179,6 @@
             <param>month=${month}</param>
             <param>day=${day}</param>
             <param>hour=${hour}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>
diff --git a/oozie/cassandra/monthly/pageview_per_project.hql 
b/oozie/cassandra/monthly/pageview_per_project.hql
index b345264..c849db9 100644
--- a/oozie/cassandra/monthly/pageview_per_project.hql
+++ b/oozie/cassandra/monthly/pageview_per_project.hql
@@ -12,13 +12,11 @@
 --         -d separator=\t                                        \
 --         -d year=2015                                           \
 --         -d month=5                                             \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/monthly/pageview_top_articles.hql 
b/oozie/cassandra/monthly/pageview_top_articles.hql
new file mode 100644
index 0000000..b81906d
--- /dev/null
+++ b/oozie/cassandra/monthly/pageview_top_articles.hql
@@ -0,0 +1,102 @@
+-- Parameters:
+--     destination_directory -- HDFS path to write output files
+--     source_table          -- Fully qualified table name to compute from.
+--     separator             -- Separator for values
+--     year                  -- year of partition to compute from.
+--     month                 -- month of partition to compute from.
+--     day                   -- day of partition to compute from.
+--
+-- Usage:
+--     hive -f pageview_top_articles.hql                          \
+--         -d destination_directory=/tmp/pageview_top_articles    \
+--         -d source_table=wmf.pageview_hourly                    \
+--         -d separator=\t                                        \
+--         -d year=2015                                           \
+--         -d month=5                                             \
+--         -d day=1                                               \
+--
+
+
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+WITH ranked AS (
+    SELECT
+        project,
+        page_title,
+        access,
+        year,
+        month,
+        views,
+        rank() OVER (PARTITION BY project, access, year, month ORDER BY views 
DESC) as rank,
+        row_number() OVER (PARTITION BY project, access, year, month ORDER BY 
views DESC) as rn
+    FROM (
+        SELECT
+            project,
+            reflect("org.json.simple.JSONObject", "escape", 
regexp_replace(page_title, '${separator}', '')) AS page_title,
+            COALESCE(regexp_replace(access_method, ' ', '-'), 'all-access') AS 
access,
+            LPAD(year, 4, "0") as year,
+            LPAD(month, 2, "0") as month,
+            SUM(view_count) as views
+        FROM ${source_table}
+        WHERE
+            year = ${year}
+            AND month = ${month}
+            AND agent_type = 'user'
+        GROUP BY project, regexp_replace(page_title, '${separator}', ''), 
access_method, year, month
+        GROUPING SETS (
+            (project, regexp_replace(page_title, '${separator}', ''), 
access_method, year, month),
+            (project, regexp_replace(page_title, '${separator}', ''), year, 
month)
+        )
+    ) raw
+),
+max_rank AS (
+    SELECT
+        project,
+        access,
+        year,
+        month,
+        rank as max_rank
+    FROM ranked
+    WHERE
+        rn = 1001
+    GROUP BY
+        project,
+        access,
+        year,
+        month,
+        rank
+)
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+-- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+-- works for exports to local directories (see HIVE-5672), we have to
+-- prepare the lines by hand through concatenation :-(
+SELECT
+    CONCAT_WS("${separator}",
+        ranked.project,
+        ranked.access,
+        ranked.year,
+        ranked.month,
+        'all-days',
+        CONCAT('[',
+            CONCAT_WS(',', collect_set(
+                CONCAT('{"article":"', ranked.page_title,
+                    '","views":', CAST(ranked.views AS STRING),
+                    ',"rank":', CAST(ranked.rank AS STRING), '}'))
+            ),']')
+    )
+FROM ranked
+LEFT JOIN max_rank ON (
+    ranked.project = max_rank.project
+    AND ranked.access = max_rank.access
+    AND ranked.year = max_rank.year
+    AND ranked.month = max_rank.month
+)
+WHERE ranked.rank < COALESCE(max_rank.max_rank, 1001)
+GROUP BY
+    ranked.project,
+    ranked.access,
+    ranked.year,
+    ranked.month
+;
\ No newline at end of file
diff --git a/oozie/cassandra/monthly/workflow.xml 
b/oozie/cassandra/monthly/workflow.xml
index cbf93eb..31d8b9d 100644
--- a/oozie/cassandra/monthly/workflow.xml
+++ b/oozie/cassandra/monthly/workflow.xml
@@ -161,7 +161,6 @@
             
<param>destination_directory=${temporary_directory}/${wf:id()}-${cassandra_keyspace}</param>
             <param>year=${year}</param>
             <param>month=${month}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>

-- 
To view, visit https://gerrit.wikimedia.org/r/270921
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to