Milimetric has uploaded a new change for review. https://gerrit.wikimedia.org/r/303339
Change subject: [WIP] Oozify sqoop import of mediawiki tables ...................................................................... [WIP] Oozify sqoop import of mediawiki tables Sqoop a list of tables from a list of mediawiki databases into hdfs. TODO: just wrote the sqoop shell script for now, need to write oozie. I'm going to run this over the weekend just to see how long it takes. Bug: T141476 Change-Id: Id088fca8a046ee91816559bbb3e8d239d9b500e0 --- M oozie/mediawiki/README.md A oozie/mediawiki/import/README.md A oozie/mediawiki/import/create_avro_table.sql A oozie/mediawiki/import/sqoop_mediawiki_dbs.sh A oozie/mediawiki/import/wiki_db.list A oozie/mediawiki/import/wiki_db_table.list A oozie/mediawiki/load/README.md 7 files changed, 193 insertions(+), 12 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery refs/changes/39/303339/1 diff --git a/oozie/mediawiki/README.md b/oozie/mediawiki/README.md index 0f93466..01dfb93 100644 --- a/oozie/mediawiki/README.md +++ b/oozie/mediawiki/README.md @@ -1,12 +1,3 @@ -This directory contains the dataset definition and coordinators that launch -jobs specific to data loaded from MediaWiki's Avro+Kafka data pipeline. - -If you are producing a new Avro dataset via Mediawiki Monolog and Kafka, -you should use these Oozie configs to import your data and automatically add Hive partitions to it. Most things needed to do this are abstracted here via the 'channel' property that is distinct for each coordinator launched by bundle.xml. - -Steps to add a new coordinator: - -- Add a CREATE TABLE hive file in hive/mediawiki and create your table in Hive. -- Add a new coordinator declaration in bundle.xml and set $channel - and $raw_data_directory appropriately. -- Relaunch the bundle. +General collection of mediawiki-related jobs: +- import: loads data from mediawiki databases into hdfs +- load: manipulating mediawiki's avro/kafka data pipeline diff --git a/oozie/mediawiki/import/README.md b/oozie/mediawiki/import/README.md new file mode 100644 index 0000000..8ace467 --- /dev/null +++ b/oozie/mediawiki/import/README.md @@ -0,0 +1 @@ +Scripts, workflows, and coordinators for sqooping a list of tables from a list of dbs into HDFS. The queries for each table are pre-written in order to control what data types we get. diff --git a/oozie/mediawiki/import/create_avro_table.sql b/oozie/mediawiki/import/create_avro_table.sql new file mode 100644 index 0000000..d1415bf --- /dev/null +++ b/oozie/mediawiki/import/create_avro_table.sql @@ -0,0 +1,29 @@ +-- example table create syntax on top of avro data +-- not sure we need this if we'll just read it in spark (do people want it anyway?) + +CREATE EXTERNAL TABLE `milimetric.enwiki_revision`( + `rev_id` bigint COMMENT '', + `rev_page` bigint COMMENT '', + `rev_text_id` bigint COMMENT '', + `rev_comment` binary COMMENT '', + `rev_user` bigint COMMENT '', + `rev_user_text` binary COMMENT '', + `rev_timestamp` binary COMMENT '', + `rev_minor_edit` boolean COMMENT '', + `rev_deleted` boolean COMMENT '', + `rev_len` bigint COMMENT '', + `rev_parent_id` bigint COMMENT '', + `rev_sha1` binary COMMENT '', + `rev_content_model` binary COMMENT '', + `rev_content_format` binary COMMENT '') +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' +LOCATION + 'hdfs://analytics-hadoop/user/milimetric/mediawiki/enwiki/revision' +TBLPROPERTIES ( + 'avro.schema.url'='hdfs:///user/milimetric/mediawiki/schemas/revision.avsc' +) diff --git a/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh b/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh new file mode 100755 index 0000000..7fa3872 --- /dev/null +++ b/oozie/mediawiki/import/sqoop_mediawiki_dbs.sh @@ -0,0 +1,141 @@ +### +# For each table in each database specified, sqoop the table in +# Example usage: +# sqoop_mediawiki_dbs.sh wiki_db.list wiki_db_table.list analytics-store.eqiad.wmnet /wmf/raw/mediawiki/ +# +# will load +# from analytics-store.eqiad.wmnet, for +# each table TABLE in wiki_db_table.list, from +# each db WIKI in wiki_db.list, into +# /wmf/raw/mediawiki/WIKI/TABLE +# as an avro data file +### + +DBS=$1 +TABLES=$2 +DB_HOST=$3 +TARGET_DIR=$4 + +QUERY_REVISION=" + select rev_id, + rev_page, + rev_text_id, + cast(rev_comment as char) as rev_comment, + rev_user, + cast(rev_user_text as char) as rev_user_text, + cast(rev_timestamp as char) as rev_timestamp, + rev_minor_edit, + rev_deleted, + rev_len, + rev_parent_id, + cast(rev_sha1 as char) as rev_sha1, + cast(rev_content_model as char) as rev_content_model, + cast(rev_content_format as char) as rev_content_format + + from revision + where \$CONDITIONS +" + +QUERY_ARCHIVE=" + select ar_id, + ar_namespace, + cast(ar_title as char) as ar_title, + cast(ar_text as char) as ar_text, + cast(ar_comment as char) as ar_comment, + ar_user, + cast(ar_user_text as char) as ar_user_text, + cast(ar_timestamp as char) as ar_timestamp, + ar_minor_edit, + cast(ar_flags as char) as ar_flags, + ar_rev_id, + ar_text_id, + ar_deleted, + ar_len, + ar_page_id, + ar_parent_id, + cast(ar_sha1 as char) as ar_sha1, + cast(ar_content_model as char) as ar_content_model, + cast(ar_content_format as char) as ar_content_format + + from archive + where \$CONDITIONS +" + +QUERY_PAGE=" + select page_id, + page_namespace, + cast(page_title as char) as page_title, + cast(page_restrictions as char) as page_restrictions, + page_counter, + page_is_redirect, + page_is_new, + page_random, + cast(page_touched as char) as page_touched, + cast(page_links_updated as char) as page_links_updated, + page_latest, + page_len, + cast(page_content_model as char) as page_content_model + + from page + where \$CONDITIONS +" + +QUERY_USER=" + select user_id, + cast(user_name as char) as user_name, + cast(user_real_name as char) as user_real_name, + cast(user_password as char) as user_password, + cast(user_newpassword as char) as user_newpassword, + cast(user_email as char) as user_email, + cast(user_options as char) as user_options, + cast(user_touched as char) as user_touched, + cast(user_token as char) as user_token, + cast(user_email_authenticated as char) as user_email_authenticated, + cast(user_email_token as char) as user_email_token, + cast(user_email_token_expires as char) as user_email_token_expires, + cast(user_registration as char) as user_registration, + cast(user_newpass_time as char) as user_newpass_time, + user_editcount, + cast(user_password_expires as char) as user_password_expires + + from user + where \$CONDITIONS +" + +QUERY_LOGGING=" + select log_id, + cast(log_type as char) as log_type, + cast(log_action as char) as log_action, + cast(log_timestamp as char) as log_timestamp, + log_user, + log_namespace, + cast(log_title as char) as log_title, + cast(log_comment as char) as log_comment, + cast(log_params as char) as log_params, + log_deleted, + cast(log_user_text as char) as log_user_text, + log_page + + from logging + where \$CONDITIONS +" + +while read DB +do + while read TABLE + do + TARGET_DIR="$TARGET_DIR/$DB/$TABLE" + echo "SQOOPING $TARGET_DIR" + + TABLE_UPPER="$(echo QUERY_$TABLE | tr [a-z] [A-Z])" + QUERY=${!TABLE_UPPER} + + sqoop --options-file options.research.txt \ + --connect "jdbc:mysql://$DB_HOST/$DB" \ + --query "$QUERY" \ + --num-mappers 1 \ + --target-dir "$TARGET_DIR" \ + --as-avrodatafile + + done < $TABLES +done < $DBS diff --git a/oozie/mediawiki/import/wiki_db.list b/oozie/mediawiki/import/wiki_db.list new file mode 100644 index 0000000..c15ec39 --- /dev/null +++ b/oozie/mediawiki/import/wiki_db.list @@ -0,0 +1,2 @@ +enwiki +rowiki diff --git a/oozie/mediawiki/import/wiki_db_table.list b/oozie/mediawiki/import/wiki_db_table.list new file mode 100644 index 0000000..4c97e68 --- /dev/null +++ b/oozie/mediawiki/import/wiki_db_table.list @@ -0,0 +1,5 @@ +archive +logging +page +revision +user diff --git a/oozie/mediawiki/load/README.md b/oozie/mediawiki/load/README.md new file mode 100644 index 0000000..0f93466 --- /dev/null +++ b/oozie/mediawiki/load/README.md @@ -0,0 +1,12 @@ +This directory contains the dataset definition and coordinators that launch +jobs specific to data loaded from MediaWiki's Avro+Kafka data pipeline. + +If you are producing a new Avro dataset via Mediawiki Monolog and Kafka, +you should use these Oozie configs to import your data and automatically add Hive partitions to it. Most things needed to do this are abstracted here via the 'channel' property that is distinct for each coordinator launched by bundle.xml. + +Steps to add a new coordinator: + +- Add a CREATE TABLE hive file in hive/mediawiki and create your table in Hive. +- Add a new coordinator declaration in bundle.xml and set $channel + and $raw_data_directory appropriately. +- Relaunch the bundle. -- To view, visit https://gerrit.wikimedia.org/r/303339 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id088fca8a046ee91816559bbb3e8d239d9b500e0 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Milimetric <dandree...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits