Nuria has submitted this change and it was merged.

Change subject: Add monthly top to cassandra and correct jobs
......................................................................


Add monthly top to cassandra and correct jobs

Add monthly top cassandra load oozie job.
Remove number of reducers parameter constraint in every
cassandra loading job. This parameter was used to
facilitate cassandra data loading by generating
a predefined number of files but it actually reduces
computation power too much in comparison to the
small win made at loading time.
Force oozie to let hive choose the number of reducers.

Bug: T120113
Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
---
M oozie/cassandra/bundle.xml
A oozie/cassandra/coord_top_articles_monthly.properties
M oozie/cassandra/daily/pageview_per_article.hql
M oozie/cassandra/daily/pageview_per_project.hql
M oozie/cassandra/daily/pageview_top_articles.hql
M oozie/cassandra/daily/workflow.xml
M oozie/cassandra/hourly/pageview_per_project.hql
M oozie/cassandra/hourly/workflow.xml
M oozie/cassandra/monthly/pageview_per_project.hql
A oozie/cassandra/monthly/pageview_top_articles.hql
M oozie/cassandra/monthly/workflow.xml
11 files changed, 273 insertions(+), 13 deletions(-)

Approvals:
  Nuria: Verified; Looks good to me, approved



diff --git a/oozie/cassandra/bundle.xml b/oozie/cassandra/bundle.xml
index 074620b..928791f 100644
--- a/oozie/cassandra/bundle.xml
+++ b/oozie/cassandra/bundle.xml
@@ -513,4 +513,64 @@
         </configuration>
     </coordinator>
 
+    <coordinator name="cassandra_pageview_top_articles_monthly-coord">
+        <app-path>${coordinator_file_monthly}</app-path>
+        <configuration>
+            <property>
+                <name>workflow_file</name>
+                <value>${workflow_file_monthly}</value>
+            </property>
+
+            <!-- Dataset the job depends on -->
+            <property>
+                <name>datasets_file</name>
+                <value>${top_articles_datasets_file}</value>
+            </property>
+            <property>
+                <name>dataset_name</name>
+                <value>${top_articles_dataset_name}</value>
+            </property>
+
+            <!-- Hive data preparation job stuff -->
+            <property>
+                <name>hive_script</name>
+                <value>${top_articles_hive_script}</value>
+            </property>
+            <property>
+                <name>source_table</name>
+                <value>${top_articles_source_table}</value>
+            </property>
+
+            <!-- cassandra loader job stuff -->
+            <property>
+                <name>cassandra_parallel_loaders</name>
+                <value>${per_article_flat_parallel_loaders}</value>
+            </property>
+            <property>
+                <name>cassandra_cql</name>
+                <value>${top_articles_cql}</value>
+            </property>
+            <property>
+                <name>cassandra_keyspace</name>
+                <value>${top_articles_keyspace}</value>
+            </property>
+            <property>
+                <name>hive_fields</name>
+                <value>${top_articles_hive_fields}</value>
+            </property>
+            <property>
+                <name>hive_fields_types</name>
+                <value>${top_articles_hive_fields_types}</value>
+            </property>
+            <property>
+                <name>cassandra_fields</name>
+                <value>${top_articles_cassandra_fields}</value>
+            </property>
+            <property>
+                <name>cassandra_primary_keys</name>
+                <value>${top_articles_cassandra_primary_keys}</value>
+            </property>
+        </configuration>
+    </coordinator>
+
 </bundle-app>
diff --git a/oozie/cassandra/coord_top_articles_monthly.properties 
b/oozie/cassandra/coord_top_articles_monthly.properties
new file mode 100644
index 0000000..2926656
--- /dev/null
+++ b/oozie/cassandra/coord_top_articles_monthly.properties
@@ -0,0 +1,96 @@
+# Configures a coordinator to manage loading cassandra for the top_articles 
monthly
+# pageview API.Any of the following properties are overidable with -D.
+# Usage:
+#   oozie job -Duser=$USER -Dstart_time=2015-05-05T00:00Z -submit -config 
oozie/cassandra/coord_top_articles_monthly.properties
+#
+# NOTE:  The $oozie_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+user                              = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g.  /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                = ${name_node}/wmf/refinery/current
+
+# HDFS path to the refinery job jar that will be used by this job.
+refinery_cassandra_jar_path       = 
${refinery_directory}/artifacts/org/wikimedia/analytics/refinery/refinery-cassandra-0.0.23.jar
+cassandra_reducer_class           = 
org.wikimedia.analytics.refinery.cassandra.ReducerToCassandra
+cassandra_output_format_class     = 
org.wikimedia.analytics.refinery.cassandra.CqlOutputFormat
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                   = ${refinery_directory}/oozie
+
+# HDFS path to coordinators to run.
+coordinator_file_hourly           = 
${oozie_directory}/cassandra/hourly/coordinator.xml
+coordinator_file_daily            = 
${oozie_directory}/cassandra/daily/coordinator.xml
+coordinator_file_monthly          = 
${oozie_directory}/cassandra/monthly/coordinator.xml
+
+# HDFS path to workflows to run.
+workflow_file_hourly              = 
${oozie_directory}/cassandra/hourly/workflow.xml
+workflow_file_daily               = 
${oozie_directory}/cassandra/daily/workflow.xml
+workflow_file_monthly             = 
${oozie_directory}/cassandra/monthly/workflow.xml
+
+
+# HDFS path to datasets definitions
+pageview_datasets_file            = ${oozie_directory}/pageview/datasets.xml
+projectview_datasets_file         = ${oozie_directory}/projectview/datasets.xml
+pageview_data_directory           = ${name_node}/wmf/data/wmf/pageview
+projectview_data_directory        = ${name_node}/wmf/data/wmf/projectview
+
+# Initial import time of the webrequest dataset.
+start_time                        = 2015-05-01T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                         = 3000-01-01T00:00Z
+
+# HDFS path to hive-site.xml file.  This is needed to run hive actions.
+hive_site_xml                     = 
${refinery_directory}/oozie/util/hive/hive-site.xml
+# Temporary directory
+temporary_directory               = ${name_node}/tmp
+
+# Cassandra cluster info
+cassandra_host                    = aqs1001.eqiad.wmnet
+cassandra_port                    = 9042
+cassandra_username                = cassandra
+cassandra_password                = cassandra
+cassandra_nodes                   = 3
+batch_size                        = 1024
+
+# Hive value separator
+hive_value_separator              = \\t
+# Cassandra table to be loaded (not job dependant)
+cassandra_table                   = data
+
+# Constant field names and value to be loaded into cassandra
+constant_output_domain_field      = _domain
+constant_output_domain_value      = analytics.wikimedia.org,text
+constant_output_granularity_field = granularity
+constant_output_tid_field         = _tid
+constant_output_tid_value         = 0,timeuuid
+
+workflow_file                     = ${workflow_file_monthly}
+datasets_file                     = ${pageview_datasets_file}
+dataset_name                      = pageview_hourly
+hive_script                       = pageview_top_articles.hql
+source_table                      = wmf.pageview_hourly
+cassandra_parallel_loaders        = 3
+cassandra_cql                     = UPDATE 
"local_group_default_T_top_pageviews"."data" SET "articlesJSON" = ?
+cassandra_keyspace                = local_group_default_T_top_pageviews
+hive_fields                       = project,access,year,month,day,articlesJSON
+hive_fields_types                 = text,text,text,text,text,text
+cassandra_fields                  = articlesJSON
+cassandra_primary_keys            = _domain,project,access,year,month,day,_tid
+
+# Coordintator to start.
+oozie.coord.application.path      = ${coordinator_file_monthly}
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
diff --git a/oozie/cassandra/daily/pageview_per_article.hql 
b/oozie/cassandra/daily/pageview_per_article.hql
index 60f15f9..97de211 100644
--- a/oozie/cassandra/daily/pageview_per_article.hql
+++ b/oozie/cassandra/daily/pageview_per_article.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=6                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_per_project.hql 
b/oozie/cassandra/daily/pageview_per_project.hql
index be08d2d..317fb31 100644
--- a/oozie/cassandra/daily/pageview_per_project.hql
+++ b/oozie/cassandra/daily/pageview_per_project.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/daily/pageview_top_articles.hql 
b/oozie/cassandra/daily/pageview_top_articles.hql
index 72eefc4..a4d15b9 100644
--- a/oozie/cassandra/daily/pageview_top_articles.hql
+++ b/oozie/cassandra/daily/pageview_top_articles.hql
@@ -14,13 +14,11 @@
 --         -d year=2015                                           \
 --         -d month=5                                             \
 --         -d day=1                                               \
---         -d reducers_number=6                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 WITH ranked AS (
diff --git a/oozie/cassandra/daily/workflow.xml 
b/oozie/cassandra/daily/workflow.xml
index adbad53..fe2a8db 100644
--- a/oozie/cassandra/daily/workflow.xml
+++ b/oozie/cassandra/daily/workflow.xml
@@ -153,6 +153,11 @@
                     <name>oozie.launcher.mapreduce.map.memory.mb</name>
                     <value>${oozie_launcher_memory}</value>
                 </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
                 <property>
                     <name>hive.exec.scratchdir</name>
                     <value>/tmp/hive-${user}</value>
@@ -166,7 +171,6 @@
             <param>year=${year}</param>
             <param>month=${month}</param>
             <param>day=${day}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>
diff --git a/oozie/cassandra/hourly/pageview_per_project.hql 
b/oozie/cassandra/hourly/pageview_per_project.hql
index 8b84ca8..1a2e4d6 100644
--- a/oozie/cassandra/hourly/pageview_per_project.hql
+++ b/oozie/cassandra/hourly/pageview_per_project.hql
@@ -16,13 +16,11 @@
 --         -d month=5                                             \
 --         -d day=1                                               \
 --         -d hour=0                                              \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/hourly/workflow.xml 
b/oozie/cassandra/hourly/workflow.xml
index 66c9c44..590b1c9 100644
--- a/oozie/cassandra/hourly/workflow.xml
+++ b/oozie/cassandra/hourly/workflow.xml
@@ -165,6 +165,11 @@
                     <name>oozie.launcher.mapreduce.map.memory.mb</name>
                     <value>${oozie_launcher_memory}</value>
                 </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
                 <property>
                     <name>hive.exec.scratchdir</name>
                     <value>/tmp/hive-${user}</value>
@@ -179,7 +184,6 @@
             <param>month=${month}</param>
             <param>day=${day}</param>
             <param>hour=${hour}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>
diff --git a/oozie/cassandra/monthly/pageview_per_project.hql 
b/oozie/cassandra/monthly/pageview_per_project.hql
index b345264..c849db9 100644
--- a/oozie/cassandra/monthly/pageview_per_project.hql
+++ b/oozie/cassandra/monthly/pageview_per_project.hql
@@ -12,13 +12,11 @@
 --         -d separator=\t                                        \
 --         -d year=2015                                           \
 --         -d month=5                                             \
---         -d reducers_number=1                                   \
 --
 
 
 SET hive.exec.compress.output=true;
 SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
-SET mapred.reduce.tasks=${reducers_number};
 
 
 INSERT OVERWRITE DIRECTORY "${destination_directory}"
diff --git a/oozie/cassandra/monthly/pageview_top_articles.hql 
b/oozie/cassandra/monthly/pageview_top_articles.hql
new file mode 100644
index 0000000..b81906d
--- /dev/null
+++ b/oozie/cassandra/monthly/pageview_top_articles.hql
@@ -0,0 +1,102 @@
+-- Parameters:
+--     destination_directory -- HDFS path to write output files
+--     source_table          -- Fully qualified table name to compute from.
+--     separator             -- Separator for values
+--     year                  -- year of partition to compute from.
+--     month                 -- month of partition to compute from.
+--     day                   -- day of partition to compute from.
+--
+-- Usage:
+--     hive -f pageview_top_articles.hql                          \
+--         -d destination_directory=/tmp/pageview_top_articles    \
+--         -d source_table=wmf.pageview_hourly                    \
+--         -d separator=\t                                        \
+--         -d year=2015                                           \
+--         -d month=5                                             \
+--         -d day=1                                               \
+--
+
+
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+WITH ranked AS (
+    SELECT
+        project,
+        page_title,
+        access,
+        year,
+        month,
+        views,
+        rank() OVER (PARTITION BY project, access, year, month ORDER BY views 
DESC) as rank,
+        row_number() OVER (PARTITION BY project, access, year, month ORDER BY 
views DESC) as rn
+    FROM (
+        SELECT
+            project,
+            reflect("org.json.simple.JSONObject", "escape", 
regexp_replace(page_title, '${separator}', '')) AS page_title,
+            COALESCE(regexp_replace(access_method, ' ', '-'), 'all-access') AS 
access,
+            LPAD(year, 4, "0") as year,
+            LPAD(month, 2, "0") as month,
+            SUM(view_count) as views
+        FROM ${source_table}
+        WHERE
+            year = ${year}
+            AND month = ${month}
+            AND agent_type = 'user'
+        GROUP BY project, regexp_replace(page_title, '${separator}', ''), 
access_method, year, month
+        GROUPING SETS (
+            (project, regexp_replace(page_title, '${separator}', ''), 
access_method, year, month),
+            (project, regexp_replace(page_title, '${separator}', ''), year, 
month)
+        )
+    ) raw
+),
+max_rank AS (
+    SELECT
+        project,
+        access,
+        year,
+        month,
+        rank as max_rank
+    FROM ranked
+    WHERE
+        rn = 1001
+    GROUP BY
+        project,
+        access,
+        year,
+        month,
+        rank
+)
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+-- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+-- works for exports to local directories (see HIVE-5672), we have to
+-- prepare the lines by hand through concatenation :-(
+SELECT
+    CONCAT_WS("${separator}",
+        ranked.project,
+        ranked.access,
+        ranked.year,
+        ranked.month,
+        'all-days',
+        CONCAT('[',
+            CONCAT_WS(',', collect_set(
+                CONCAT('{"article":"', ranked.page_title,
+                    '","views":', CAST(ranked.views AS STRING),
+                    ',"rank":', CAST(ranked.rank AS STRING), '}'))
+            ),']')
+    )
+FROM ranked
+LEFT JOIN max_rank ON (
+    ranked.project = max_rank.project
+    AND ranked.access = max_rank.access
+    AND ranked.year = max_rank.year
+    AND ranked.month = max_rank.month
+)
+WHERE ranked.rank < COALESCE(max_rank.max_rank, 1001)
+GROUP BY
+    ranked.project,
+    ranked.access,
+    ranked.year,
+    ranked.month
+;
\ No newline at end of file
diff --git a/oozie/cassandra/monthly/workflow.xml 
b/oozie/cassandra/monthly/workflow.xml
index cbf93eb..00a7aac 100644
--- a/oozie/cassandra/monthly/workflow.xml
+++ b/oozie/cassandra/monthly/workflow.xml
@@ -149,6 +149,11 @@
                     <name>oozie.launcher.mapreduce.map.memory.mb</name>
                     <value>${oozie_launcher_memory}</value>
                 </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
                 <property>
                     <name>hive.exec.scratchdir</name>
                     <value>/tmp/hive-${user}</value>
@@ -161,7 +166,6 @@
             
<param>destination_directory=${temporary_directory}/${wf:id()}-${cassandra_keyspace}</param>
             <param>year=${year}</param>
             <param>month=${month}</param>
-            <param>reducers_number=${cassandra_parallel_loaders}</param>
         </hive>
 
         <ok to="load_cassandra"/>

-- 
To view, visit https://gerrit.wikimedia.org/r/270921
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ia3344b8c66a9e01e0258c55f049c7c288660e045
Gerrit-PatchSet: 3
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Madhuvishy <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to