GoranSMilovanovic has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/398666 )
Change subject: EngineGeo 16 Dec 2017 ...................................................................... EngineGeo 16 Dec 2017 Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0 --- M WDCM_EngineGeo_goransm.R 1 file changed, 1 insertion(+), 900 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/wmde/WDCM refs/changes/66/398666/1 diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 7013c3a..25e8a40 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -209,908 +209,9 @@ } ### --------------------------------------------------------------------------- -### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module +### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --------------------------------------------------------------------------- -### --------------------------------------------------------------------------- -### --- WDCM Search Module, v. Beta 0.1 -### --- Script: WDCM_Search_Clients.R, v. Beta 0.1 -### --- Author: Goran S. Milovanovic, Data Analyst, WMDE -### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv -### --- and WMDE. -### --- Contact: goran.milovanovic_...@wikimedia.de -### --------------------------------------------------------------------------- -### --- DESCRIPTION: -### --- WDCM_Search_Clients.R takes a list of -### --- item IDs from Wikidata (the list is delivered by -### --- the WDCM_Collect_Items.R script) and searches for their -### --- usage across the Wikimedia projects in Hadoop: -### --- database: goransm -### --- directory: wdcmsqoop -### --- table: wdcm_clients_wb_entity_usage -### --- from production (currently: stat1005.eqiad.wmnet). -### --- NOTE: wdcm_clients_wb_entity_usage is produced by -### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet) -### --------------------------------------------------------------------------- -### --- INPUT: -### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs -### --- to search for from /WDCM_CollectedItems -### --- This folder contains the .csv files that specify the item IDs -### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R -### --------------------------------------------------------------------------- -### --- OUTPUT: -### --- wdcm_maintable Hive table on hdfs, database: goransm -### --------------------------------------------------------------------------- - -### --- read item categories: -setwd(itemsDir) -idFiles <- list.files() -idFiles <- idFiles[grepl(".csv$", idFiles)] -idFilesSize <- file.size(idFiles)/1024^2 - -### --- Track all categories under processing: -# - check for the existence of the wdcmSearchReport file -# - and delete the old file if it's found: -setwd(logDir) -lF <- list.files() -w <- which(grepl("^wdcmSearchReport", lF)) -if (length(w) == 1) { - file.remove(lF[w]) -} -### --- generate wdcmSearchReport data.frame -wdcmSearchReport <- data.frame(category = idFiles, - fileSize = idFilesSize, - startTime = character(length(idFiles)), - endTime = character(length(idFiles)), - stringsAsFactors = F -) -wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ] - -### --- PREPARATION: delete goransm.wdcm_maintable if exists, -### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs path) - -### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it: -# - NOTE: drop wdcm_maintable == erase metastore data: -# - [query01Err] - -# - to runtime Log: -print("Running query [query01Err].") - -query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) -if (query01Err != 0) { - # - to runtime Log: - print("--- (!!!) query01Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) - # - check errors: - if (query01Err != 0) { - # - to runtime Log: - print("--- (!!!) query01Err failed AGAIN: quiting.") - quit() - } -} - -### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -# - [query02Err] -# - to runtime Log: -print("--- Running query [query02Err].") -query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', wait = T) -if (query02Err != 0) { - # - to runtime Log: - print("--- (!!!) query02Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', wait = T) - # - check errors: - if (query02Err != 0) { - # - to runtime Log: - print("--- (!!!) query02Err failed AGAIN: quiting.") - quit() - } -} - -### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs path) -# - [query03Err] -# - to runtime Log: -print("--- Running query [query03Err].") -query03Err <- system(command = 'hdfs dfs -mkdir /user/goransm/wdcmtables', wait = T) -if (query03Err != 0) { - # - to runtime Log: - print("--- (!!!) query03Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query03Err <- system(command = 'hdfs dfs -mkdir /user/goransm/wdcmtables', wait = T) - # - check errors: - if (query03Err != 0) { - # - to runtime Log: - print("--- (!!!) query03Err failed AGAIN: quiting.") - quit() - } -} - -#### --- loop over item categories: - -# - back to itemsDir for the loop: -setwd(itemsDir) - -# - to runtime Log: -print("--- LOOP: Producing wdcm_maintable from wdcm_clients_wb_entity_usage now.") - -for (i in 1:length(wdcmSearchReport$category)) { - - # - start time for this category: - wdcmSearchReport$startTime[i] <- as.character(Sys.time()) - - ### --- read item IDs: - wFile <- which(grepl(wdcmSearchReport$category[i], idFiles, fixed = T)) - qIDs <- read.csv(idFiles[wFile], - header = T, - check.names = F, - stringsAsFactors = F) - qIDs <- qIDs$item - qIDs <- qIDs[grepl("^Q[[:digit:]]+", qIDs)] - - ### --- cut into batches (if necessary) - # - cut into batches (5MB max. batch size) - batchNum <- ceiling(wdcmSearchReport$fileSize[i]/10) - batchSize <- round(length(qIDs)/batchNum) - startBatchIx <- c(1:batchNum) * batchSize - batchSize + 1 - stopBatchIx <- c(1:batchNum) * batchSize - stopBatchIx[batchNum] <- length(qIDs) - - for (batch in 1:batchNum) { - - # - to runtime Log: - print(paste("------------- Processing category: ", i, ": ", wdcmSearchReport$category[i], sep = "")) - print(paste("------------- Processing batch: ", batch, " out of ", batchNum, sep = "")) - - # - create goransm.wdcm_maintable Hive table if this is the first entry: - # - (create wdcm_maintable Hive Table on (hdfs path): /user/goransm/wdcmtables) - - if ((i == 1) & (batch == 1)) { - - hiveCommand <- "\"USE goransm; CREATE EXTERNAL TABLE \\\`goransm.wdcm_maintable\\\`( - \\\`eu_entity_id\\\` string COMMENT '', - \\\`eu_project\\\` string COMMENT '', - \\\`eu_count\\\` bigint COMMENT '' - ) - COMMENT - '' - PARTITIONED BY ( - \\\`category\\\` string COMMENT 'The item category') - ROW FORMAT SERDE - 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' - STORED AS INPUTFORMAT - 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' - OUTPUTFORMAT - 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' - LOCATION - 'hdfs://analytics-hadoop/user/goransm/wdcmtables';\"" - hiveCommand <- paste("/usr/local/bin/beeline --silent -e ", hiveCommand, sep = "") - # - [query04AErr] - # - to runtime Log: - print("--- Running query [query04AErr].") - query04AErr <- system(command = hiveCommand, wait = TRUE) - if (query04AErr != 0) { - # - to runtime Log: - print("--- (!!!) query04AErr failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query04AErr <- system(command = hiveCommand, wait = TRUE) - # - check errors: - if (query04AErr != 0) { - # - to runtime Log: - print("--- (!!!) query04AErr failed AGAIN: quiting.") - quit() - } - } - } - - # - construct HiveQL query to search for category i items - # - across all wiki_db: - - # - to runtime Log: - print("--- RUNNING HiveQL Query to search for category items.") - hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;" - hiveQLQuery_2 <- paste("INSERT INTO TABLE wdcm_maintable - PARTITION (category='", - strsplit(wdcmSearchReport$category[i], split = "_", fixed = T)[[1]][1], - "') ", - "SELECT eu_entity_id, wiki_db AS eu_project, COUNT(*) AS eu_count FROM ( - SELECT DISTINCT eu_entity_id, eu_page_id, wiki_db FROM wdcm_clients_wb_entity_usage - WHERE eu_entity_id IN (", - paste("\"", qIDs[startBatchIx[batch]:stopBatchIx[batch]], "\"", collapse = ", ", sep = ""), - ")) AS t - GROUP BY eu_entity_id, wiki_db;", - sep = "") - hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ") - # - write hiveQLQuery locally: - setwd(fPath) - write(hiveQLQuery, "hiveQLQuery.hql") - # - execute HiveQLQuery: - hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", getwd(), "/hiveQLQuery.hql", sep = "") - # - [query04BErr] - # - to runtime Log: - print("--- Running query [query04BErr].") - - query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE) - if (query04BErr != 0) { - # - to runtime Log: - print("--- (!!!) query04BErr failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE) - # - check errors: - if (query04BErr != 0) { - # - to runtime Log: - print("--- (!!!) query04BErr failed AGAIN: quiting.") - quit() - } - } - - # - to runtime Log: - print("--- REPAIR TABLE.") - - # - repair partitions: - # - query04CErr - # - to runtime Log: - print("Running query [query04CErr].") - query04CErr <- system(command = - '/usr/local/bin/beeline --silent -e "USE goransm; SET hive.mapred.mode = nonstrict; MSCK REPAIR TABLE wdcm_maintable;"', - wait = TRUE) - if (query04CErr != 0) { - # - to runtime Log: - print("--- (!!!) query04CErr failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query04CErr <- system(command = - '/usr/local/bin/beeline --silent -e "USE goransm; SET hive.mapred.mode = nonstrict; MSCK REPAIR TABLE wdcm_maintable;"', - wait = TRUE) - # - check errors: - if (query04CErr != 0) { - # - to runtime Log: - print("--- (!!!) query04CErr failed AGAIN: quiting.") - quit() - } - } - - # - end time for this category: - wdcmSearchReport$endTime[i] <- as.character(Sys.time()) - - # - back to item categories: - setwd(itemsDir) - - } - -} - -### --- store report: - -# - to runtime Log: -print("--- LOG Search Phase completed.") - -# - to wdcmSearchReport: -setwd(logDir) -write.csv(wdcmSearchReport, - paste("wdcmSearchReport_", - strsplit(as.character(Sys.time()), - split = " ")[[1]][1], - ".csv", - sep = "")) - -# - write to WDCM main reporting file: -mainReport <- read.csv('WDCM_MainReport.csv', - header = T, - row.names = 1, - check.names = F, - stringsAsFactors = F) -newReport <- data.frame(Step = 'SearchItems', - Time = as.character(Sys.time()), - stringsAsFactors = F) -mainReport <- rbind(mainReport, newReport) -write.csv(mainReport, 'WDCM_MainReport.csv') - -### --------------------------------------------------------------------------- -### --- Script 3: WDCM_Pre-Process.R, WDCM Process Module -### --------------------------------------------------------------------------- - -### --------------------------------------------------------------------------- -### --- WDCM Process Module, v. Beta 0.1 -### --- Script: WDCM_Pre-Process.R, v. Beta 0.1 -### --- Author: Goran S. Milovanovic, Data Analyst, WMDE -### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv -### --- and WMDE. -### --- Contact: goran.milovanovic_...@wikimedia.de -### --------------------------------------------------------------------------- -### --- DESCRIPTION: -### --- WDCM_Pre-Process.R works with the -### --- wdcm_maintable Hive table on hdfs, database: goransm -### --- to produce the .tsv files that migrate to -### --- the wikidataconcepts.wmflabs.org Cloud VPS instance -### --- from production (currently: stat1005.eqiad.wmnet). -### --------------------------------------------------------------------------- -### --- INPUT: -### --- wdcm_maintable Hive table on hdfs, database: goransm -### --------------------------------------------------------------------------- -### --- OUTPUT: -### --- Results are stored locally as .tsv files on production - -### --- - on stat1005.eqiad.wmnet - in: -### --- /WDCM_dataOut -### --- These output .tsv files migrate to Labs: -### --- wikidataconcepts.wmflabs.org Cloud VPS instance -### --- where they are then processed by the WDCM Process Module. -### --------------------------------------------------------------------------- - -# - to runtime Log: -print("--- START: PRE-PROCESS") - -### --- functions - -# - projectType() to determine project type -projectType <- function(projectName) { - unname(sapply(projectName, function(x) { - if (grepl("commons", x, fixed = T)) {"Commons" - } else if (grepl("mediawiki|meta|species|wikidata", x)) {"Other" - } else if (grepl("wiki$", x)) {"Wikipedia" - } else if (grepl("quote$", x)) {"Wikiquote" - } else if (grepl("voyage$", x)) {"Wikivoyage" - } else if (grepl("news$", x)) {"Wikinews" - } else if (grepl("source$", x)) {"Wikisource" - } else if (grepl("wiktionary$", x)) {"Wiktionary" - } else if (grepl("versity$", x)) {"Wikiversity" - } else if (grepl("books$", x)) {"Wikibooks" - } else {"Other"} - })) -} - -### --- produce wdcm_item.tsv from wdcm_maintable (hdfs, database: goransm) -### --- NOTE: one .tsv file per category (~14M rows, causes Java gc overflow from hive...) - -# - to runtime Log: -print("--- STEP: produce wdcm_item.tsv from wdcm_maintable") - -### --- read item categories: -setwd(itemsDir) - -idFiles <- list.files() -idFiles <- idFiles[grepl(".csv$", idFiles)] -categories <- unname(sapply(idFiles, function(x) { - strsplit(x, split = "_")[[1]][1] -})) - -for (i in 1:length(categories)) { - - filename <- paste("wdcm_item_", - gsub(" ", "", categories[i], fixed = T), ".tsv", - sep = "") - - # - to runtime Log: - print("--- processing: ", filename, sep = "") - - hiveQLquery <- paste( - 'USE goransm; SELECT eu_entity_id, SUM(eu_count) AS eu_count FROM wdcm_maintable WHERE category=\\"', - categories[i], - '\\" GROUP BY eu_entity_id ORDER BY eu_count DESC LIMIT 100000;', - sep = "") - - # - query05Err - # - to runtime Log: - print("--- Running query [query05Err].") - query05Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', dataDir, - "/", filename, - sep = ""), - wait = TRUE) - if (query05Err != 0) { - # - to runtime Log: - print("--- (!!!) query05Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query05Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', dataDir, - "/", filename, - sep = ""), - wait = TRUE) - # - check errors: - if (query05Err != 0) { - # - to runtime Log: - print("--- (!!!) query05Err failed AGAIN: quiting.") - quit() - } - } - -} - -### --- ETL Phase - -# - to runtime Log: -print("--- STEP: ETL PHASE") - -### --- to dataDir (and EVERYTHING ELSE goes to dataDir) -setwd(dataDir) - -# - to runtime Log: -print("--- STEP: produce wdcm_project_category.tsv") - -### --- produce wdcm_project_category.tsv from wdcm_maintable (hdfs, database: goransm) -hiveQLquery <- 'USE goransm; - SET hive.mapred.mode=unstrict; - SELECT eu_project, category, SUM(eu_count) AS eu_count - FROM wdcm_maintable - GROUP BY eu_project, category ORDER BY eu_count DESC LIMIT 10000000;' -# - [query06Err] -# - to runtime Log: -print("--- Running query [query06Err].") - -query06Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_category.tsv', - sep = ""), - wait = TRUE) -if (query06Err != 0) { - # - to runtime Log: - print("--- (!!!) query06Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query06Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_category.tsv', - sep = ""), - wait = TRUE) - # - check errors: - if (query06Err != 0) { - # - to runtime Log: - print("--- (!!!) query06Err failed AGAIN: quiting.") - quit() - } -} -# - add projecttype to wdcm_project_category.tsv -wdcm_project_category <- as.data.frame(fread('wdcm_project_category.tsv')) -wdcm_project_category$projectype <- projectType(wdcm_project_category$eu_project) -write.csv(wdcm_project_category, "wdcm_project_category.csv") - -# - to runtime Log: -print("--- STEP: produce wdcm_project.tsv") - -### --- produce wdcm_project.tsv from wdcm_maintable (hdfs, database: goransm) -hiveQLquery <- 'USE goransm; - SET hive.mapred.mode=unstrict; - SELECT eu_project, SUM(eu_count) AS eu_count - FROM wdcm_maintable - GROUP BY eu_project ORDER BY eu_count DESC LIMIT 10000000;' -# - [query07Err] -# - to runtime Log: -print("Running query [query07Err].") -query07Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project.tsv', - sep = ""), - wait = TRUE) -if (query07Err != 0) { - # - to runtime Log: - print("query07Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query07Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project.tsv', - sep = ""), - wait = TRUE) - # - check errors: - if (query07Err != 0) { - print("query07Err failed AGAIN: quiting.") - quit() - } -} - -# - add projecttype to wdcm_project.tsv -wdcm_project <- as.data.frame(fread('wdcm_project.tsv')) -wdcm_project$projectype <- projectType(wdcm_project$eu_project) -write.csv(wdcm_project, "wdcm_project.csv") - -# - to runtime Log: -print("STEP: wdcm_category.tsv") - -### --- produce wdcm_category.tsv from wdcm_maintable (hdfs, database: goransm) -hiveQLquery <- 'USE goransm; - SET hive.mapred.mode=unstrict; - SELECT category, SUM(eu_count) AS eu_count - FROM wdcm_maintable - GROUP BY category ORDER BY eu_count DESC LIMIT 10000000;' -# - [query08Err] -# - to runtime Log: -print("Running query [query08Err].") -query08Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_category.tsv', - sep = ""), - wait = TRUE) -if (query08Err != 0) { - # - to runtime Log: - print("query08Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query08Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_category.tsv', - sep = ""), - wait = TRUE) - # - check errors: - if (query08Err != 0) { - # - to runtime Log: - print("query08Err failed AGAIN: quiting.") - quit() - } -} -# - save wdcm_category.tsv as .csv -wdcm_category <- as.data.frame(fread('wdcm_category.tsv')) -write.csv(wdcm_category, "wdcm_category.csv") - -# - to runtime Log: -print("STEP: produce wdcm_project_category_item100.tsv") - -### --- produce wdcm_project_category_item100.tsv from wdcm_maintable (hdfs, database: goransm) -hiveQLquery <- 'USE goransm; - SET hive.mapred.mode=unstrict; - SELECT * FROM ( - SELECT eu_project, category, eu_entity_id, eu_count, ROW_NUMBER() OVER (PARTITION BY eu_project, category ORDER BY eu_count DESC) AS row - FROM wdcm_maintable) t - WHERE row <= 100;' -# - [query09Err] -# - to runtime Log: -print("Running query [query09Err].") -query09Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_category_item100.tsv', - sep = ""), - wait = TRUE) -if (query09Err != 0) { - # - to runtime Log: - print("query09Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query09Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_category_item100.tsv', - sep = ""), - wait = TRUE) - # - check errors: - if (query09Err != 0) { - # - to runtime Log: - print("query09Err failed AGAIN: quiting.") - quit() - } -} - -# - add projecttype to wdcm_project_category_item100.tsv -wdcm_project_category_item100 <- as.data.frame(fread('wdcm_project_category_item100.tsv')) -wdcm_project_category_item100$projectype <- projectType(wdcm_project_category_item100$t.eu_project) -write.csv(wdcm_project_category_item100, "wdcm_project_category_item100.csv") - -# - to runtime Log: -print("STEP: produce wdcm_project_item100.tsv") - -### --- produce wdcm_project_item100.tsv from wdcm_maintable (hdfs, database: goransm) -hiveQLquery <- 'USE goransm; - SET hive.mapred.mode=unstrict; - SELECT * FROM ( - SELECT eu_project, eu_entity_id, eu_count, ROW_NUMBER() OVER (PARTITION BY eu_project ORDER BY eu_count DESC) AS row - FROM wdcm_maintable) t - WHERE row <= 100;' -# - [query10Err] -# - to runtime Log: -print("Running query [query10Err].") -query10Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_item100.tsv', - sep = ""), - wait = TRUE) -if (query10Err != 0) { - # - to runtime Log: - print("query10Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query10Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/wdcm_project_item100.tsv', - sep = ""), - wait = TRUE) - # - check errors: - if (query10Err != 0) { - # - to runtime Log: - print("query10Err failed AGAIN: quiting.") - quit() - } -} - -# - add projecttype to wdcm_project_item100.tsv -wdcm_project_item100 <- as.data.frame(fread('wdcm_project_item100.tsv')) -wdcm_project_item100$projectype <- projectType(wdcm_project_item100$t.eu_project) -write.csv(wdcm_project_item100, "wdcm_project_item100.csv") - - -### --- Semantic Modeling Phase -### --- produce project-item matrices for semantic topic modeling - -# - to runtime Log: -print("STEP: Semantic Modeling Phase: TDF MATRICES") -itemFiles <- list.files() -itemFiles <- itemFiles[grepl("^wdcm_item", itemFiles)] -for (i in 1:length(itemFiles)) { - # - to runtime Log: - print(paste("----------------------- TDF matrix formation: category ", i, ".", sep = "")) - # - load categoryFile[i].tsv as data.frame - categoryName <- strsplit(itemFiles[i], ".", fixed = T)[[1]][1] - categoryName <- strsplit(categoryName, "_", fixed = T)[[1]][3] - categoryName <- gsub("([[:lower:]])([[:upper:]])", "\\1 \\2", categoryName) - # - load items - # - NOTE: AN ARBITRARY DECISION TO MODEL TOP 5000 MOST FREQUENTLY USED ITEMS: - # - nrows = 5000 - categoryFile <- fread(itemFiles[i], nrows = 5000) - # - list of items to fetch - itemList <- categoryFile$eu_entity_id - # - hiveQL: - hiveQLquery <- paste('USE goransm; SELECT eu_project, eu_entity_id, eu_count FROM wdcm_maintable WHERE eu_entity_id IN (', - paste0("'", itemList, "'", collapse = ", ", sep = ""), - ') AND category = \\"', - categoryName, - '\\";', - sep = "") - fileName <- gsub(" ", "", categoryName, fixed = T) - fileName <- paste("tfMatrix_", fileName, ".tsv", sep = "") - # - [query11Err] - # - to runtime Log: - print("Running query [query11Err].") - query11Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/', - fileName, - sep = ""), - wait = TRUE) - if (query11Err != 0) { - # - to runtime Log: - print("query11Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query11Err <- system(command = paste('/usr/local/bin/beeline --silent -e "', - hiveQLquery, - '" > ', getwd(), '/', - fileName, - sep = ""), - wait = TRUE) - # - check errors: - if (query11Err != 0) { - # - to runtime Log: - print("query11Err failed AGAIN: quiting.") - quit() - } - } -} - -### --- reshape project-item matrices for semantic topic modeling - -# - to runtime Log: -print("STEP: Semantic Modeling Phase: RESHAPING TDF MATRICES") -itemFiles <- list.files() -itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)] -itemFiles <- itemFiles[grepl(".tsv", itemFiles, fixed = T)] -for (i in 1:length(itemFiles)) { - # - to runtime Log: - print(paste("----------------------- Reshaping TDF matrix: category ", i, ".", sep = "")) - # - load categoryFile[i].tsv as data.frame - categoryFile <- fread(itemFiles[i]) - categoryFile <- spread(categoryFile, - key = eu_entity_id, - value = eu_count, - fill = 0) - rownames(categoryFile) <- categoryFile$eu_project - categoryFile$eu_project <- NULL - w <- which(colSums(categoryFile) == 0) - if (length(w) > 0) { - categoryFile <- categoryFile[, -w] - } - w <- which(rowSums(categoryFile) == 0) - if (length(w) > 0) { - categoryFile <- categoryFile[-w, ] - } - fileName <- paste(strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1], ".csv", sep = "") - write.csv(categoryFile, fileName) -} - -### --- semantic topic models for each category - -### --- to nohup.out -# - to runtime Log: -print("STEP: Semantic Modeling Phase: LDA estimation") -itemFiles <- list.files()[grepl(".csv", x = list.files(), fixed = T)] -itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)] -for (i in 1:length(itemFiles)) { - - categoryName <- strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1] - categoryName <- strsplit(categoryName, split = "_", fixed = T)[[1]][2] - - # - topic modeling: - itemCat <- read.csv(itemFiles[i], - header = T, - check.names = F, - row.names = 1, - stringsAsFactors = F) - itemCat <- as.simple_triplet_matrix(itemCat) - - ## -- run on K = seq(2,20) semantic topics - - # - to runtime Log: - print(paste("----------------------- LDA model: category ", i, ".", sep = "")) - topicModel <- list() - numTopics <- seq(2, 10, by = 1) - for (k in 1:length(numTopics)) { - topicModel[[k]] <- maptpx::topics(counts = itemCat, - K = numTopics[k], - shape = NULL, - initopics = NULL, - tol = 0.1, - bf = T, - kill = 0, - ord = TRUE, - verb = 2) - } - # - clear: - rm(itemCat); gc() - # - determine model from Bayes Factor against Null: - wModel <- which.max(sapply(topicModel, function(x) {x$BF})) - topicModel <- topicModel[[wModel]] - - # - collect matrices: - wdcm_itemtopic <- as.data.frame(topicModel$theta) - colnames(wdcm_itemtopic) <- paste("topic", seq(1, dim(wdcm_itemtopic)[2]), sep = "") - itemTopicFileName <- paste('wdcm2_itemtopic', - paste(categoryName, ".csv", sep = ""), - sep = "_") - write.csv(wdcm_itemtopic, itemTopicFileName) - - wdcm_projecttopic <- as.data.frame(topicModel$omega) - colnames(wdcm_projecttopic) <- paste("topic", seq(1, dim(wdcm_projecttopic)[2]), sep = "") - wdcm_projecttopic$project <- rownames(wdcm_projecttopic) - wdcm_projecttopic$projecttype <- projectType(wdcm_projecttopic$project) - projectTopicFileName <- paste('wdcm2_projecttopic', - paste(categoryName, ".csv", sep = ""), - sep = "_") - write.csv(wdcm_projecttopic, projectTopicFileName) - - # - clear: - rm(topicModel); rm(wdcm_projecttopic); rm(wdcm_itemtopic); gc() - -} - -### --- t-SNE 2D maps from wdcm2_projectttopic files: projects similarity structure -# - to runtime Log: -print("STEP: Semantic Modeling Phase: t-SNE 2D MAPS") -projectFiles <- list.files() -projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)] -for (i in 1:length(projectFiles)) { - # filename: - fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1] - fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3] - fileName <- paste("wdcm2_tsne2D_project_", fileName, ".csv", sep = "") - # load: - projectTopics <- read.csv(projectFiles[i], - header = T, - check.names = F, - row.names = 1, - stringsAsFactors = F) - projectTopics$project <- NULL - projectTopics$projecttype <- NULL - # - Distance space, metric: Hellinger - projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = T)) - # - t-SNE 2D map - tsneProject <- Rtsne(projectDist, - theta = .5, - is_distance = T, - perplexity = 10) - # - store: - tsneProject <- as.data.frame(tsneProject$Y) - colnames(tsneProject) <- paste("D", seq(1:dim(tsneProject)[2]), sep = "") - tsneProject$project <- rownames(projectTopics) - tsneProject$projecttype <- projectType(tsneProject$project) - write.csv(tsneProject, fileName) - # - clear: - rm(projectTopics); rm(projectDist); rm(tsneProject) -} - -### --- {visNetwork} graphs from wdcm2_projectttopic files: projects similarity structure - -# - to runtime Log: -print("STEP: {visNetwork} graphs from wdcm2_projectttopic files") -projectFiles <- list.files() -projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)] -for (i in 1:length(projectFiles)) { - # - load: - projectTopics <- read.csv(projectFiles[i], - header = T, - check.names = F, - row.names = 1, - stringsAsFactors = F) - projectTopics$project <- NULL - projectTopics$projecttype <- NULL - # - Distance space, metric: Hellinger - projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = T)) - # - {visNetwork} nodes data.frame: - indexMinDist <- sapply(rownames(projectDist), function(x) { - w <- which(rownames(projectDist) %in% x) - y <- sort(projectDist[w, -w], decreasing = T) - names(y)[length(y)] - }) - id <- 1:length(colnames(projectDist)) - label <- colnames(projectDist) - nodes <- data.frame(id = id, - label = label, - stringsAsFactors = F) - # - {visNetwork} edges data.frame: - edges <- data.frame(from = names(indexMinDist), - to = unname(indexMinDist), - stringsAsFactors = F) - edges$from <- sapply(edges$from, function(x) { - nodes$id[which(nodes$label %in% x)] - }) - edges$to <- sapply(edges$to, function(x) { - nodes$id[which(nodes$label %in% x)] - }) - edges$arrows <- rep("to", length(edges$to)) - # filenames: - fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1] - fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3] - nodesFileName <- paste("wdcm2_visNetworkNodes_project_", fileName, ".csv", sep = "") - edgesFileName <- paste("wdcm2_visNetworkEdges_project_", fileName, ".csv", sep = "") - # store: - write.csv(nodes, nodesFileName) - write.csv(edges, edgesFileName) - # - clear: - rm(projectTopics); rm(projectDist); rm(nodes); rm(edges); gc() -} - -# - write to WDCM main reporting file: -setwd(logDir) -mainReport <- read.csv('WDCM_MainReport.csv', - header = T, - row.names = 1, - check.names = F, - stringsAsFactors = F) -newReport <- data.frame(Step = 'Pre-Process', - Time = as.character(Sys.time()), - stringsAsFactors = F) -mainReport <- rbind(mainReport, newReport) -write.csv(mainReport, 'WDCM_MainReport.csv') - -### --- toLabsReport -toLabsReport <- data.frame(timeStamp = as.character(Sys.time()), - statbox = "stat1005", - sqoopbox = "stat1004", - stringsAsFactors = F) -write.csv(toLabsReport, "toLabsReport.csv") -# - to runtime Log: -print(paste("--- UPDATE RUN COMPLETED ON:", Sys.time(), sep = " ")) -### --- copy reports to /srv/published-datasets/wdcm: -# - WDCM_MainReport -system(command = 'cp /home/goransm/RScripts/WDCM_R/WDCM_Logs/WDCM_MainReport.csv /srv/published-datasets/wdcm/', wait = T) -# - toLabsReport -system(command = 'cp /home/goransm/RScripts/WDCM_R/WDCM_Logs/toLabsReport.csv /srv/published-datasets/wdcm/', wait = T) -- To view, visit https://gerrit.wikimedia.org/r/398666 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0 Gerrit-PatchSet: 1 Gerrit-Project: analytics/wmde/WDCM Gerrit-Branch: master Gerrit-Owner: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits