[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017 ed2
GoranSMilovanovic has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/398667 ) Change subject: EngineGeo 16 Dec 2017 ed2 .. EngineGeo 16 Dec 2017 ed2 Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139 --- M WDCM_EngineGeo_goransm.R M WDCM_Engine_goransm.R 2 files changed, 122 insertions(+), 14 deletions(-) Approvals: GoranSMilovanovic: Verified; Looks good to me, approved diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 25e8a40..9d9c8df 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -54,6 +54,7 @@ library(httr) library(XML) # - wrangling: +library(dplyr) library(stringr) library(readr) library(data.table) @@ -62,11 +63,11 @@ ### --- Directories # - fPath: where the scripts is run from? -fPath <- '/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/' +fPath <- '/home/goransm/RScripts/WDCM_R' # - form paths: -ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "") -logDir <- paste(fPath, 'WDCM_Logs', sep = "") -itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "") +ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "") +logDir <- paste(fPath, '/WDCM_Logs', sep = "") +itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "") # - stat1005 published-datasets, maps onto # - https://analytics.wikimedia.org/datasets/wdcm/ dataDir <- '/srv/published-datasets/wdcm' @@ -75,9 +76,9 @@ print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " ")) ### --- Set proxy -# Sys.setenv( -# http_proxy = "http://webproxy.eqiad.wmnet:8080;, -# https_proxy = "http://webproxy.eqiad.wmnet:8080;) +Sys.setenv( + http_proxy = "http://webproxy.eqiad.wmnet:8080;, + https_proxy = "http://webproxy.eqiad.wmnet:8080;) ### --- Read WDCM_GeoItems # - to runtime Log: @@ -114,8 +115,6 @@ strsplit(wdcmGeoItems$item[i], split = ",", fixed = T)[[1]], "both") - - itemsOut <- list() # - Construct Query: query <- paste0( @@ -190,8 +189,8 @@ write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv") # - write to WDCM main reporting file: lF <- list.files() -if ('WDCM_MainReport.csv' %in% lF) { - mainReport <- read.csv('WDCM_MainReport.csv', +if ('WDCM_GeoReport.csv' %in% lF) { + mainReport <- read.csv('WDCM_GeoReport.csv', header = T, row.names = 1, check.names = F, @@ -200,18 +199,128 @@ Time = as.character(Sys.time()), stringsAsFactors = F) mainReport <- rbind(mainReport, newReport) - write.csv(mainReport, 'WDCM_MainReport.csv') + write.csv(mainReport, 'WDCM_GeoReport.csv') } else { newReport <- data.frame(Step = 'CollectItems', Time = as.character(Sys.time()), stringsAsFactors = F) - write.csv(newReport, 'WDCM_MainReport.csv') + write.csv(newReport, 'WDCM_GeoReport.csv') } ### --- ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --- +# - to runtime Log: +print("--- ETL: Wikidata usage statistics from wdcm_maintable") + +### --- read item categories: +setwd(itemsDir) + +idFiles <- list.files() +idFiles <- idFiles[grepl(".csv$", idFiles)] +categories <- unname(sapply(idFiles, function(x) { + strsplit(x, split = "_")[[1]][1] +})) + +for (i in 1:length(categories)) { + + # - filename + filename <- paste("wdcm_geoitem_", +gsub(" ", "", categories[i], fixed = T), ".tsv", +sep = "") + + # - geoitems + geoitems <- read.csv(idFiles[i], + header = T, + check.names = F, + stringsAsFactors = F) + searchitems <- geoitems$item + + # - to runtime Log: + print(paste("--- processing: ", i, ": ", filename, sep = "")) + + # - to runtime Log: + print("--- RUNNING HiveQL Query to search for geoitems.") + hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;" + hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM wdcm_maintable WHERE eu_entity_id IN (", + paste("\"", searchitems, "\"", collapse = ", ", sep = ""), + ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 1000;", + sep = "") + hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ") + + # - write hiveQLQuery locally: + setwd(fPath) + write(hiveQLQuery, "hiveQL_geoQuery.hql") + + # - execute HiveQLQuery: + hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", + getwd(), "/hiveQL_geoQuery.hql", + " > ", dataDir, "/", filename, + sep = "") +
[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017 ed2
GoranSMilovanovic has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/398667 ) Change subject: EngineGeo 16 Dec 2017 ed2 .. EngineGeo 16 Dec 2017 ed2 Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139 --- M WDCM_EngineGeo_goransm.R M WDCM_Engine_goransm.R 2 files changed, 122 insertions(+), 14 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/wmde/WDCM refs/changes/67/398667/1 diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 25e8a40..9d9c8df 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -54,6 +54,7 @@ library(httr) library(XML) # - wrangling: +library(dplyr) library(stringr) library(readr) library(data.table) @@ -62,11 +63,11 @@ ### --- Directories # - fPath: where the scripts is run from? -fPath <- '/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/' +fPath <- '/home/goransm/RScripts/WDCM_R' # - form paths: -ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "") -logDir <- paste(fPath, 'WDCM_Logs', sep = "") -itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "") +ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "") +logDir <- paste(fPath, '/WDCM_Logs', sep = "") +itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "") # - stat1005 published-datasets, maps onto # - https://analytics.wikimedia.org/datasets/wdcm/ dataDir <- '/srv/published-datasets/wdcm' @@ -75,9 +76,9 @@ print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " ")) ### --- Set proxy -# Sys.setenv( -# http_proxy = "http://webproxy.eqiad.wmnet:8080;, -# https_proxy = "http://webproxy.eqiad.wmnet:8080;) +Sys.setenv( + http_proxy = "http://webproxy.eqiad.wmnet:8080;, + https_proxy = "http://webproxy.eqiad.wmnet:8080;) ### --- Read WDCM_GeoItems # - to runtime Log: @@ -114,8 +115,6 @@ strsplit(wdcmGeoItems$item[i], split = ",", fixed = T)[[1]], "both") - - itemsOut <- list() # - Construct Query: query <- paste0( @@ -190,8 +189,8 @@ write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv") # - write to WDCM main reporting file: lF <- list.files() -if ('WDCM_MainReport.csv' %in% lF) { - mainReport <- read.csv('WDCM_MainReport.csv', +if ('WDCM_GeoReport.csv' %in% lF) { + mainReport <- read.csv('WDCM_GeoReport.csv', header = T, row.names = 1, check.names = F, @@ -200,18 +199,128 @@ Time = as.character(Sys.time()), stringsAsFactors = F) mainReport <- rbind(mainReport, newReport) - write.csv(mainReport, 'WDCM_MainReport.csv') + write.csv(mainReport, 'WDCM_GeoReport.csv') } else { newReport <- data.frame(Step = 'CollectItems', Time = as.character(Sys.time()), stringsAsFactors = F) - write.csv(newReport, 'WDCM_MainReport.csv') + write.csv(newReport, 'WDCM_GeoReport.csv') } ### --- ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --- +# - to runtime Log: +print("--- ETL: Wikidata usage statistics from wdcm_maintable") + +### --- read item categories: +setwd(itemsDir) + +idFiles <- list.files() +idFiles <- idFiles[grepl(".csv$", idFiles)] +categories <- unname(sapply(idFiles, function(x) { + strsplit(x, split = "_")[[1]][1] +})) + +for (i in 1:length(categories)) { + + # - filename + filename <- paste("wdcm_geoitem_", +gsub(" ", "", categories[i], fixed = T), ".tsv", +sep = "") + + # - geoitems + geoitems <- read.csv(idFiles[i], + header = T, + check.names = F, + stringsAsFactors = F) + searchitems <- geoitems$item + + # - to runtime Log: + print(paste("--- processing: ", i, ": ", filename, sep = "")) + + # - to runtime Log: + print("--- RUNNING HiveQL Query to search for geoitems.") + hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;" + hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM wdcm_maintable WHERE eu_entity_id IN (", + paste("\"", searchitems, "\"", collapse = ", ", sep = ""), + ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 1000;", + sep = "") + hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ") + + # - write hiveQLQuery locally: + setwd(fPath) + write(hiveQLQuery, "hiveQL_geoQuery.hql") + + # - execute HiveQLQuery: + hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", + getwd(), "/hiveQL_geoQuery.hql", + " > ", dataDir, "/", filename, +
[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017
GoranSMilovanovic has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/398666 ) Change subject: EngineGeo 16 Dec 2017 .. EngineGeo 16 Dec 2017 Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0 --- M WDCM_EngineGeo_goransm.R 1 file changed, 1 insertion(+), 900 deletions(-) Approvals: GoranSMilovanovic: Looks good to me, approved jenkins-bot: Verified diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 7013c3a..25e8a40 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -209,908 +209,9 @@ } ### --- -### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module +### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --- -### --- -### --- WDCM Search Module, v. Beta 0.1 -### --- Script: WDCM_Search_Clients.R, v. Beta 0.1 -### --- Author: Goran S. Milovanovic, Data Analyst, WMDE -### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv -### --- and WMDE. -### --- Contact: goran.milovanovic_...@wikimedia.de -### --- -### --- DESCRIPTION: -### --- WDCM_Search_Clients.R takes a list of -### --- item IDs from Wikidata (the list is delivered by -### --- the WDCM_Collect_Items.R script) and searches for their -### --- usage across the Wikimedia projects in Hadoop: -### --- database: goransm -### --- directory: wdcmsqoop -### --- table: wdcm_clients_wb_entity_usage -### --- from production (currently: stat1005.eqiad.wmnet). -### --- NOTE: wdcm_clients_wb_entity_usage is produced by -### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet) -### --- -### --- INPUT: -### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs -### --- to search for from /WDCM_CollectedItems -### --- This folder contains the .csv files that specify the item IDs -### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R -### --- -### --- OUTPUT: -### --- wdcm_maintable Hive table on hdfs, database: goransm -### --- - -### --- read item categories: -setwd(itemsDir) -idFiles <- list.files() -idFiles <- idFiles[grepl(".csv$", idFiles)] -idFilesSize <- file.size(idFiles)/1024^2 - -### --- Track all categories under processing: -# - check for the existence of the wdcmSearchReport file -# - and delete the old file if it's found: -setwd(logDir) -lF <- list.files() -w <- which(grepl("^wdcmSearchReport", lF)) -if (length(w) == 1) { - file.remove(lF[w]) -} -### --- generate wdcmSearchReport data.frame -wdcmSearchReport <- data.frame(category = idFiles, - fileSize = idFilesSize, - startTime = character(length(idFiles)), - endTime = character(length(idFiles)), - stringsAsFactors = F -) -wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ] - -### --- PREPARATION: delete goransm.wdcm_maintable if exists, -### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs path) - -### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it: -# - NOTE: drop wdcm_maintable == erase metastore data: -# - [query01Err] - -# - to runtime Log: -print("Running query [query01Err].") - -query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) -if (query01Err != 0) { - # - to runtime Log: - print("--- (!!!) query01Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) - # - check errors: - if (query01Err != 0) { -# - to runtime Log: -print("--- (!!!) query01Err failed AGAIN: quiting.") -quit() - } -} - -### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -# - [query02Err] -# - to runtime Log: -print("--- Running query [query02Err].") -query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', wait = T) -if (query02Err != 0) { - # - to runtime Log: - print("--- (!!!) query02Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query02Err <- system(command = 'hdfs dfs -rm
[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017
GoranSMilovanovic has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/398666 ) Change subject: EngineGeo 16 Dec 2017 .. EngineGeo 16 Dec 2017 Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0 --- M WDCM_EngineGeo_goransm.R 1 file changed, 1 insertion(+), 900 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/wmde/WDCM refs/changes/66/398666/1 diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 7013c3a..25e8a40 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -209,908 +209,9 @@ } ### --- -### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module +### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --- -### --- -### --- WDCM Search Module, v. Beta 0.1 -### --- Script: WDCM_Search_Clients.R, v. Beta 0.1 -### --- Author: Goran S. Milovanovic, Data Analyst, WMDE -### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv -### --- and WMDE. -### --- Contact: goran.milovanovic_...@wikimedia.de -### --- -### --- DESCRIPTION: -### --- WDCM_Search_Clients.R takes a list of -### --- item IDs from Wikidata (the list is delivered by -### --- the WDCM_Collect_Items.R script) and searches for their -### --- usage across the Wikimedia projects in Hadoop: -### --- database: goransm -### --- directory: wdcmsqoop -### --- table: wdcm_clients_wb_entity_usage -### --- from production (currently: stat1005.eqiad.wmnet). -### --- NOTE: wdcm_clients_wb_entity_usage is produced by -### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet) -### --- -### --- INPUT: -### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs -### --- to search for from /WDCM_CollectedItems -### --- This folder contains the .csv files that specify the item IDs -### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R -### --- -### --- OUTPUT: -### --- wdcm_maintable Hive table on hdfs, database: goransm -### --- - -### --- read item categories: -setwd(itemsDir) -idFiles <- list.files() -idFiles <- idFiles[grepl(".csv$", idFiles)] -idFilesSize <- file.size(idFiles)/1024^2 - -### --- Track all categories under processing: -# - check for the existence of the wdcmSearchReport file -# - and delete the old file if it's found: -setwd(logDir) -lF <- list.files() -w <- which(grepl("^wdcmSearchReport", lF)) -if (length(w) == 1) { - file.remove(lF[w]) -} -### --- generate wdcmSearchReport data.frame -wdcmSearchReport <- data.frame(category = idFiles, - fileSize = idFilesSize, - startTime = character(length(idFiles)), - endTime = character(length(idFiles)), - stringsAsFactors = F -) -wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ] - -### --- PREPARATION: delete goransm.wdcm_maintable if exists, -### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs path) - -### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it: -# - NOTE: drop wdcm_maintable == erase metastore data: -# - [query01Err] - -# - to runtime Log: -print("Running query [query01Err].") - -query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) -if (query01Err != 0) { - # - to runtime Log: - print("--- (!!!) query01Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T) - # - check errors: - if (query01Err != 0) { -# - to runtime Log: -print("--- (!!!) query01Err failed AGAIN: quiting.") -quit() - } -} - -### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs path) -# - [query02Err] -# - to runtime Log: -print("--- Running query [query02Err].") -query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', wait = T) -if (query02Err != 0) { - # - to runtime Log: - print("--- (!!!) query02Err failed: waiting for 1h before next attempt...") - # - sleep for one hour - Sys.sleep(time = 60*60) - # - re-run query - query02Err <- system(command = 'hdfs dfs -rm -r