GoranSMilovanovic has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/398667 )

Change subject: EngineGeo 16 Dec 2017 ed2
......................................................................


EngineGeo 16 Dec 2017 ed2

Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139
---
M WDCM_EngineGeo_goransm.R
M WDCM_Engine_goransm.R
2 files changed, 122 insertions(+), 14 deletions(-)

Approvals:
  GoranSMilovanovic: Verified; Looks good to me, approved



diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 25e8a40..9d9c8df 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -54,6 +54,7 @@
 library(httr)
 library(XML)
 # - wrangling:
+library(dplyr)
 library(stringr)
 library(readr)
 library(data.table)
@@ -62,11 +63,11 @@
 
 ### --- Directories
 # - fPath: where the scripts is run from?
-fPath <- 
'/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/'
+fPath <- '/home/goransm/RScripts/WDCM_R'
 # - form paths:
-ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "")
-logDir <- paste(fPath, 'WDCM_Logs', sep = "")
-itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "")
+ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "")
+logDir <- paste(fPath, '/WDCM_Logs', sep = "")
+itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "")
 # - stat1005 published-datasets, maps onto 
 # - https://analytics.wikimedia.org/datasets/wdcm/
 dataDir <- '/srv/published-datasets/wdcm'
@@ -75,9 +76,9 @@
 print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " "))
 
 ### --- Set proxy
-# Sys.setenv(
-#   http_proxy = "http://webproxy.eqiad.wmnet:8080";,
-#   https_proxy = "http://webproxy.eqiad.wmnet:8080";)
+Sys.setenv(
+  http_proxy = "http://webproxy.eqiad.wmnet:8080";,
+  https_proxy = "http://webproxy.eqiad.wmnet:8080";)
 
 ### --- Read WDCM_GeoItems
 # - to runtime Log:
@@ -114,8 +115,6 @@
   strsplit(wdcmGeoItems$item[i],
            split = ",", fixed = T)[[1]],
   "both")
-  
-  itemsOut <- list()
   
   # - Construct Query:
   query <- paste0(
@@ -190,8 +189,8 @@
 write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv")
 # - write to WDCM main reporting file:
 lF <- list.files()
-if ('WDCM_MainReport.csv' %in% lF) {
-  mainReport <- read.csv('WDCM_MainReport.csv',
+if ('WDCM_GeoReport.csv' %in% lF) {
+  mainReport <- read.csv('WDCM_GeoReport.csv',
                          header = T,
                          row.names = 1,
                          check.names = F,
@@ -200,18 +199,128 @@
                           Time = as.character(Sys.time()),
                           stringsAsFactors = F)
   mainReport <- rbind(mainReport, newReport)
-  write.csv(mainReport, 'WDCM_MainReport.csv')
+  write.csv(mainReport, 'WDCM_GeoReport.csv')
 } else {
   newReport <- data.frame(Step = 'CollectItems',
                           Time = as.character(Sys.time()),
                           stringsAsFactors = F)
-  write.csv(newReport, 'WDCM_MainReport.csv')
+  write.csv(newReport, 'WDCM_GeoReport.csv')
 }
 
 ### ---------------------------------------------------------------------------
 ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---------------------------------------------------------------------------
 
+# - to runtime Log:
+print("--- ETL: Wikidata usage statistics from wdcm_maintable")
+
+### --- read item categories:
+setwd(itemsDir)
+
+idFiles <- list.files()
+idFiles <- idFiles[grepl(".csv$", idFiles)]
+categories <- unname(sapply(idFiles, function(x) {
+  strsplit(x, split = "_")[[1]][1]
+}))
+
+for (i in 1:length(categories)) {
+  
+  # - filename
+  filename <- paste("wdcm_geoitem_",
+                    gsub(" ", "", categories[i], fixed = T), ".tsv",
+                    sep = "")
+  
+  # - geoitems
+  geoitems <- read.csv(idFiles[i],
+                       header = T,
+                       check.names = F,
+                       stringsAsFactors = F)
+  searchitems <- geoitems$item
+  
+  # - to runtime Log:
+  print(paste("--- processing: ", i, ": ", filename, sep = ""))
+  
+  # - to runtime Log:
+  print("--- RUNNING HiveQL Query to search for geoitems.")
+  hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;"
+  hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM 
wdcm_maintable WHERE eu_entity_id IN (",
+                         paste("\"", searchitems, "\"", collapse = ", ", sep = 
""),
+                         ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 
1000;",
+                         sep = "")
+  hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ")
+  
+  # - write hiveQLQuery locally:
+  setwd(fPath)
+  write(hiveQLQuery, "hiveQL_geoQuery.hql")
+  
+  # - execute HiveQLQuery:
+  hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", 
+                              getwd(), "/hiveQL_geoQuery.hql", 
+                              " > ", dataDir, "/", filename, 
+                              sep = "")
+  # - [query01Err]
+  # - to runtime Log:
+  print("--- Running query [query01Err].")
+  query01Err <- system(command = hiveQLQueryCommand, wait = TRUE)
+  if (query01Err != 0) {
+    # - to runtime Log:
+    print("--- (!!!) query01Err failed: waiting for 1h before next attempt...")
+    # - sleep for one hour
+    Sys.sleep(time = 60*60)
+    # - re-run query
+    query01Err <- system(command = hiveQLQueryCommand, wait = TRUE)
+    # - check errors:
+    if (query01Err != 0) {
+      # - to runtime Log:
+      print("--- (!!!) query01Err failed AGAIN: quiting.")
+      quit()
+    }
+  }
+  
+  # - back to itemsDir
+  setwd(itemsDir)
+  
+}
+
+### --- join coordinates, items, labels, and usage
+
+
+### --- log ETL step:
+# - to runtime Log:
+print("--- LOG: ETL from wdcm_maintable step completed.")
+# - set log dir:
+setwd(logDir)
+# - write to WDCM main reporting file:
+lF <- list.files()
+if ('WDCM_GeoReport.csv' %in% lF) {
+  mainReport <- read.csv('WDCM_GeoReport.csv',
+                         header = T,
+                         row.names = 1,
+                         check.names = F,
+                         stringsAsFactors = F)
+  newReport <- data.frame(Step = 'ETL_wdcm_maintable',
+                          Time = as.character(Sys.time()),
+                          stringsAsFactors = F)
+  mainReport <- rbind(mainReport, newReport)
+  write.csv(mainReport, 'WDCM_GeoReport.csv')
+} else {
+  newReport <- data.frame(Step = 'CollectItems',
+                          Time = as.character(Sys.time()),
+                          stringsAsFactors = F)
+  write.csv(newReport, 'WDCM_GeoReport.csv')
+}
+
+### ---------------------------------------------------------------------------
+### --- Step 3: toLabsGeoReport
+### ---------------------------------------------------------------------------
+
+
+
+
+
+
+
+
 
 
 
diff --git a/WDCM_Engine_goransm.R b/WDCM_Engine_goransm.R
index 61cba1b..b75ad53 100644
--- a/WDCM_Engine_goransm.R
+++ b/WDCM_Engine_goransm.R
@@ -505,7 +505,6 @@
     # - [query04BErr]
     # - to runtime Log:
     print("--- Running query [query04BErr].")
-
     query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE)
     if (query04BErr != 0) {
       # - to runtime Log:

-- 
To view, visit https://gerrit.wikimedia.org/r/398667
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wmde/WDCM
Gerrit-Branch: master
Gerrit-Owner: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to