GoranSMilovanovic has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/398667 )
Change subject: EngineGeo 16 Dec 2017 ed2 ...................................................................... EngineGeo 16 Dec 2017 ed2 Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139 --- M WDCM_EngineGeo_goransm.R M WDCM_Engine_goransm.R 2 files changed, 122 insertions(+), 14 deletions(-) Approvals: GoranSMilovanovic: Verified; Looks good to me, approved diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R index 25e8a40..9d9c8df 100644 --- a/WDCM_EngineGeo_goransm.R +++ b/WDCM_EngineGeo_goransm.R @@ -54,6 +54,7 @@ library(httr) library(XML) # - wrangling: +library(dplyr) library(stringr) library(readr) library(data.table) @@ -62,11 +63,11 @@ ### --- Directories # - fPath: where the scripts is run from? -fPath <- '/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/' +fPath <- '/home/goransm/RScripts/WDCM_R' # - form paths: -ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "") -logDir <- paste(fPath, 'WDCM_Logs', sep = "") -itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "") +ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "") +logDir <- paste(fPath, '/WDCM_Logs', sep = "") +itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "") # - stat1005 published-datasets, maps onto # - https://analytics.wikimedia.org/datasets/wdcm/ dataDir <- '/srv/published-datasets/wdcm' @@ -75,9 +76,9 @@ print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " ")) ### --- Set proxy -# Sys.setenv( -# http_proxy = "http://webproxy.eqiad.wmnet:8080", -# https_proxy = "http://webproxy.eqiad.wmnet:8080") +Sys.setenv( + http_proxy = "http://webproxy.eqiad.wmnet:8080", + https_proxy = "http://webproxy.eqiad.wmnet:8080") ### --- Read WDCM_GeoItems # - to runtime Log: @@ -114,8 +115,6 @@ strsplit(wdcmGeoItems$item[i], split = ",", fixed = T)[[1]], "both") - - itemsOut <- list() # - Construct Query: query <- paste0( @@ -190,8 +189,8 @@ write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv") # - write to WDCM main reporting file: lF <- list.files() -if ('WDCM_MainReport.csv' %in% lF) { - mainReport <- read.csv('WDCM_MainReport.csv', +if ('WDCM_GeoReport.csv' %in% lF) { + mainReport <- read.csv('WDCM_GeoReport.csv', header = T, row.names = 1, check.names = F, @@ -200,18 +199,128 @@ Time = as.character(Sys.time()), stringsAsFactors = F) mainReport <- rbind(mainReport, newReport) - write.csv(mainReport, 'WDCM_MainReport.csv') + write.csv(mainReport, 'WDCM_GeoReport.csv') } else { newReport <- data.frame(Step = 'CollectItems', Time = as.character(Sys.time()), stringsAsFactors = F) - write.csv(newReport, 'WDCM_MainReport.csv') + write.csv(newReport, 'WDCM_GeoReport.csv') } ### --------------------------------------------------------------------------- ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable ### --------------------------------------------------------------------------- +# - to runtime Log: +print("--- ETL: Wikidata usage statistics from wdcm_maintable") + +### --- read item categories: +setwd(itemsDir) + +idFiles <- list.files() +idFiles <- idFiles[grepl(".csv$", idFiles)] +categories <- unname(sapply(idFiles, function(x) { + strsplit(x, split = "_")[[1]][1] +})) + +for (i in 1:length(categories)) { + + # - filename + filename <- paste("wdcm_geoitem_", + gsub(" ", "", categories[i], fixed = T), ".tsv", + sep = "") + + # - geoitems + geoitems <- read.csv(idFiles[i], + header = T, + check.names = F, + stringsAsFactors = F) + searchitems <- geoitems$item + + # - to runtime Log: + print(paste("--- processing: ", i, ": ", filename, sep = "")) + + # - to runtime Log: + print("--- RUNNING HiveQL Query to search for geoitems.") + hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;" + hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM wdcm_maintable WHERE eu_entity_id IN (", + paste("\"", searchitems, "\"", collapse = ", ", sep = ""), + ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 1000;", + sep = "") + hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ") + + # - write hiveQLQuery locally: + setwd(fPath) + write(hiveQLQuery, "hiveQL_geoQuery.hql") + + # - execute HiveQLQuery: + hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", + getwd(), "/hiveQL_geoQuery.hql", + " > ", dataDir, "/", filename, + sep = "") + # - [query01Err] + # - to runtime Log: + print("--- Running query [query01Err].") + query01Err <- system(command = hiveQLQueryCommand, wait = TRUE) + if (query01Err != 0) { + # - to runtime Log: + print("--- (!!!) query01Err failed: waiting for 1h before next attempt...") + # - sleep for one hour + Sys.sleep(time = 60*60) + # - re-run query + query01Err <- system(command = hiveQLQueryCommand, wait = TRUE) + # - check errors: + if (query01Err != 0) { + # - to runtime Log: + print("--- (!!!) query01Err failed AGAIN: quiting.") + quit() + } + } + + # - back to itemsDir + setwd(itemsDir) + +} + +### --- join coordinates, items, labels, and usage + + +### --- log ETL step: +# - to runtime Log: +print("--- LOG: ETL from wdcm_maintable step completed.") +# - set log dir: +setwd(logDir) +# - write to WDCM main reporting file: +lF <- list.files() +if ('WDCM_GeoReport.csv' %in% lF) { + mainReport <- read.csv('WDCM_GeoReport.csv', + header = T, + row.names = 1, + check.names = F, + stringsAsFactors = F) + newReport <- data.frame(Step = 'ETL_wdcm_maintable', + Time = as.character(Sys.time()), + stringsAsFactors = F) + mainReport <- rbind(mainReport, newReport) + write.csv(mainReport, 'WDCM_GeoReport.csv') +} else { + newReport <- data.frame(Step = 'CollectItems', + Time = as.character(Sys.time()), + stringsAsFactors = F) + write.csv(newReport, 'WDCM_GeoReport.csv') +} + +### --------------------------------------------------------------------------- +### --- Step 3: toLabsGeoReport +### --------------------------------------------------------------------------- + + + + + + + + diff --git a/WDCM_Engine_goransm.R b/WDCM_Engine_goransm.R index 61cba1b..b75ad53 100644 --- a/WDCM_Engine_goransm.R +++ b/WDCM_Engine_goransm.R @@ -505,7 +505,6 @@ # - [query04BErr] # - to runtime Log: print("--- Running query [query04BErr].") - query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE) if (query04BErr != 0) { # - to runtime Log: -- To view, visit https://gerrit.wikimedia.org/r/398667 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139 Gerrit-PatchSet: 1 Gerrit-Project: analytics/wmde/WDCM Gerrit-Branch: master Gerrit-Owner: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de> Gerrit-Reviewer: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits