Milimetric has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/303339

Change subject: [WIP] Oozify sqoop import of mediawiki tables
......................................................................

[WIP] Oozify sqoop import of mediawiki tables

Sqoop a list of tables from a list of mediawiki databases into hdfs.

TODO: just wrote the sqoop shell script for now, need to write oozie.
I'm going to run this over the weekend just to see how long it takes.

Bug: T141476
Change-Id: Id088fca8a046ee91816559bbb3e8d239d9b500e0
---
M oozie/mediawiki/README.md
A oozie/mediawiki/import/README.md
A oozie/mediawiki/import/create_avro_table.sql
A oozie/mediawiki/import/sqoop_mediawiki_dbs.sh
A oozie/mediawiki/import/wiki_db.list
A oozie/mediawiki/import/wiki_db_table.list
A oozie/mediawiki/load/README.md
7 files changed, 193 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/39/303339/1

diff --git a/oozie/mediawiki/README.md b/oozie/mediawiki/README.md
index 0f93466..01dfb93 100644
--- a/oozie/mediawiki/README.md
+++ b/oozie/mediawiki/README.md
@@ -1,12 +1,3 @@
-This directory contains the dataset definition and coordinators that launch
-jobs specific to data loaded from MediaWiki's Avro+Kafka data pipeline.
-
-If you are producing a new Avro dataset via Mediawiki Monolog and Kafka,
-you should use these Oozie configs to import your data and automatically add 
Hive partitions to it.  Most things needed to do this are abstracted here via 
the 'channel' property that is distinct for each coordinator launched by 
bundle.xml.
-
-Steps to add a new coordinator:
-
-- Add a CREATE TABLE hive file in hive/mediawiki and create your table in Hive.
-- Add a new coordinator declaration in bundle.xml and set $channel
-  and $raw_data_directory appropriately.
-- Relaunch the bundle.
+General collection of mediawiki-related jobs:
+- import: loads data from mediawiki databases into hdfs
+- load: manipulating mediawiki's avro/kafka data pipeline
diff --git a/oozie/mediawiki/import/README.md b/oozie/mediawiki/import/README.md
new file mode 100644
index 0000000..8ace467
--- /dev/null
+++ b/oozie/mediawiki/import/README.md
@@ -0,0 +1 @@
+Scripts, workflows, and coordinators for sqooping a list of tables from a list 
of dbs into HDFS.  The queries for each table are pre-written in order to 
control what data types we get.
diff --git a/oozie/mediawiki/import/create_avro_table.sql 
b/oozie/mediawiki/import/create_avro_table.sql
new file mode 100644
index 0000000..d1415bf
--- /dev/null
+++ b/oozie/mediawiki/import/create_avro_table.sql
@@ -0,0 +1,29 @@
+-- example table create syntax on top of avro data
+-- not sure we need this if we'll just read it in spark (do people want it 
anyway?)
+
+CREATE EXTERNAL TABLE `milimetric.enwiki_revision`(
+  `rev_id` bigint COMMENT '', 
+  `rev_page` bigint COMMENT '', 
+  `rev_text_id` bigint COMMENT '', 
+  `rev_comment` binary COMMENT '', 
+  `rev_user` bigint COMMENT '', 
+  `rev_user_text` binary COMMENT '', 
+  `rev_timestamp` binary COMMENT '', 
+  `rev_minor_edit` boolean COMMENT '', 
+  `rev_deleted` boolean COMMENT '', 
+  `rev_len` bigint COMMENT '', 
+  `rev_parent_id` bigint COMMENT '', 
+  `rev_sha1` binary COMMENT '', 
+  `rev_content_model` binary COMMENT '', 
+  `rev_content_format` binary COMMENT '')
+ROW FORMAT SERDE 
+  'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
+STORED AS INPUTFORMAT 
+  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' 
+OUTPUTFORMAT 
+  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+LOCATION
+  'hdfs://analytics-hadoop/user/milimetric/mediawiki/enwiki/revision'
+TBLPROPERTIES (
+  'avro.schema.url'='hdfs:///user/milimetric/mediawiki/schemas/revision.avsc'
+)
diff --git a/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh 
b/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh
new file mode 100755
index 0000000..7fa3872
--- /dev/null
+++ b/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh
@@ -0,0 +1,141 @@
+###
+# For each table in each database specified, sqoop the table in
+#   Example usage:
+#   sqoop_mediawiki_dbs.sh wiki_db.list wiki_db_table.list 
analytics-store.eqiad.wmnet /wmf/raw/mediawiki/
+#
+#       will load
+#           from analytics-store.eqiad.wmnet, for
+#           each table TABLE in wiki_db_table.list, from
+#           each db WIKI in wiki_db.list, into
+#           /wmf/raw/mediawiki/WIKI/TABLE
+#           as an avro data file
+###
+
+DBS=$1
+TABLES=$2
+DB_HOST=$3
+TARGET_DIR=$4
+
+QUERY_REVISION="
+ select rev_id,
+        rev_page,
+        rev_text_id,
+        cast(rev_comment as char)           as rev_comment,
+        rev_user,
+        cast(rev_user_text as char)         as rev_user_text,
+        cast(rev_timestamp as char)         as rev_timestamp,
+        rev_minor_edit,
+        rev_deleted,
+        rev_len,
+        rev_parent_id,
+        cast(rev_sha1 as char)              as rev_sha1,
+        cast(rev_content_model as char)     as rev_content_model,
+        cast(rev_content_format as char)    as rev_content_format
+
+   from revision
+  where \$CONDITIONS
+"
+
+QUERY_ARCHIVE="
+ select ar_id,
+        ar_namespace,
+        cast(ar_title as char)          as ar_title,
+        cast(ar_text as char)           as ar_text,
+        cast(ar_comment as char)        as ar_comment,
+        ar_user,
+        cast(ar_user_text as char)      as ar_user_text,
+        cast(ar_timestamp as char)      as ar_timestamp,
+        ar_minor_edit,
+        cast(ar_flags as char)          as ar_flags,
+        ar_rev_id,
+        ar_text_id,
+        ar_deleted,
+        ar_len,
+        ar_page_id,
+        ar_parent_id,
+        cast(ar_sha1 as char)           as ar_sha1,
+        cast(ar_content_model as char)  as ar_content_model,
+        cast(ar_content_format as char) as ar_content_format
+
+   from archive
+  where \$CONDITIONS
+"
+
+QUERY_PAGE="
+ select page_id,
+        page_namespace,
+        cast(page_title as char)            as page_title,
+        cast(page_restrictions as char)     as page_restrictions,
+        page_counter,
+        page_is_redirect,
+        page_is_new,
+        page_random,
+        cast(page_touched as char)          as page_touched,
+        cast(page_links_updated as char)    as page_links_updated,
+        page_latest,
+        page_len,
+        cast(page_content_model as char)    as page_content_model
+
+   from page
+  where \$CONDITIONS
+"
+
+QUERY_USER="
+ select user_id,
+        cast(user_name as char)                 as user_name,
+        cast(user_real_name as char)            as user_real_name,
+        cast(user_password as char)             as user_password,
+        cast(user_newpassword as char)          as user_newpassword,
+        cast(user_email as char)                as user_email,
+        cast(user_options as char)              as user_options,
+        cast(user_touched as char)              as user_touched,
+        cast(user_token as char)                as user_token,
+        cast(user_email_authenticated as char)  as user_email_authenticated,
+        cast(user_email_token as char)          as user_email_token,
+        cast(user_email_token_expires as char)  as user_email_token_expires,
+        cast(user_registration as char)         as user_registration,
+        cast(user_newpass_time as char)         as user_newpass_time,
+        user_editcount,
+        cast(user_password_expires as char)     as user_password_expires
+
+   from user
+  where \$CONDITIONS
+"
+
+QUERY_LOGGING="
+ select log_id,
+        cast(log_type as char)      as log_type,
+        cast(log_action as char)    as log_action,
+        cast(log_timestamp as char) as log_timestamp,
+        log_user,
+        log_namespace,
+        cast(log_title as char)     as log_title,
+        cast(log_comment as char)   as log_comment,
+        cast(log_params as char)    as log_params,
+        log_deleted,
+        cast(log_user_text as char) as log_user_text,
+        log_page
+
+   from logging
+  where \$CONDITIONS
+"
+
+while read DB
+do
+    while read TABLE
+    do
+        TARGET_DIR="$TARGET_DIR/$DB/$TABLE"
+        echo "SQOOPING $TARGET_DIR"
+
+        TABLE_UPPER="$(echo QUERY_$TABLE | tr [a-z] [A-Z])"
+        QUERY=${!TABLE_UPPER}
+
+        sqoop --options-file options.research.txt                              
     \
+            --connect "jdbc:mysql://$DB_HOST/$DB"                              
     \
+            --query "$QUERY"                                                   
     \
+            --num-mappers 1                                                    
     \
+            --target-dir "$TARGET_DIR"                                         
     \
+            --as-avrodatafile
+
+    done < $TABLES
+done < $DBS
diff --git a/oozie/mediawiki/import/wiki_db.list 
b/oozie/mediawiki/import/wiki_db.list
new file mode 100644
index 0000000..c15ec39
--- /dev/null
+++ b/oozie/mediawiki/import/wiki_db.list
@@ -0,0 +1,2 @@
+enwiki
+rowiki
diff --git a/oozie/mediawiki/import/wiki_db_table.list 
b/oozie/mediawiki/import/wiki_db_table.list
new file mode 100644
index 0000000..4c97e68
--- /dev/null
+++ b/oozie/mediawiki/import/wiki_db_table.list
@@ -0,0 +1,5 @@
+archive
+logging
+page
+revision
+user
diff --git a/oozie/mediawiki/load/README.md b/oozie/mediawiki/load/README.md
new file mode 100644
index 0000000..0f93466
--- /dev/null
+++ b/oozie/mediawiki/load/README.md
@@ -0,0 +1,12 @@
+This directory contains the dataset definition and coordinators that launch
+jobs specific to data loaded from MediaWiki's Avro+Kafka data pipeline.
+
+If you are producing a new Avro dataset via Mediawiki Monolog and Kafka,
+you should use these Oozie configs to import your data and automatically add 
Hive partitions to it.  Most things needed to do this are abstracted here via 
the 'channel' property that is distinct for each coordinator launched by 
bundle.xml.
+
+Steps to add a new coordinator:
+
+- Add a CREATE TABLE hive file in hive/mediawiki and create your table in Hive.
+- Add a new coordinator declaration in bundle.xml and set $channel
+  and $raw_data_directory appropriately.
+- Relaunch the bundle.

-- 
To view, visit https://gerrit.wikimedia.org/r/303339
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id088fca8a046ee91816559bbb3e8d239d9b500e0
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <dandree...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to