[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017 ed2

2017-12-16 Thread GoranSMilovanovic (Code Review)
GoranSMilovanovic has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/398667 )

Change subject: EngineGeo 16 Dec 2017 ed2
..


EngineGeo 16 Dec 2017 ed2

Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139
---
M WDCM_EngineGeo_goransm.R
M WDCM_Engine_goransm.R
2 files changed, 122 insertions(+), 14 deletions(-)

Approvals:
  GoranSMilovanovic: Verified; Looks good to me, approved



diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 25e8a40..9d9c8df 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -54,6 +54,7 @@
 library(httr)
 library(XML)
 # - wrangling:
+library(dplyr)
 library(stringr)
 library(readr)
 library(data.table)
@@ -62,11 +63,11 @@
 
 ### --- Directories
 # - fPath: where the scripts is run from?
-fPath <- 
'/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/'
+fPath <- '/home/goransm/RScripts/WDCM_R'
 # - form paths:
-ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "")
-logDir <- paste(fPath, 'WDCM_Logs', sep = "")
-itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "")
+ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "")
+logDir <- paste(fPath, '/WDCM_Logs', sep = "")
+itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "")
 # - stat1005 published-datasets, maps onto 
 # - https://analytics.wikimedia.org/datasets/wdcm/
 dataDir <- '/srv/published-datasets/wdcm'
@@ -75,9 +76,9 @@
 print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " "))
 
 ### --- Set proxy
-# Sys.setenv(
-#   http_proxy = "http://webproxy.eqiad.wmnet:8080;,
-#   https_proxy = "http://webproxy.eqiad.wmnet:8080;)
+Sys.setenv(
+  http_proxy = "http://webproxy.eqiad.wmnet:8080;,
+  https_proxy = "http://webproxy.eqiad.wmnet:8080;)
 
 ### --- Read WDCM_GeoItems
 # - to runtime Log:
@@ -114,8 +115,6 @@
   strsplit(wdcmGeoItems$item[i],
split = ",", fixed = T)[[1]],
   "both")
-  
-  itemsOut <- list()
   
   # - Construct Query:
   query <- paste0(
@@ -190,8 +189,8 @@
 write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv")
 # - write to WDCM main reporting file:
 lF <- list.files()
-if ('WDCM_MainReport.csv' %in% lF) {
-  mainReport <- read.csv('WDCM_MainReport.csv',
+if ('WDCM_GeoReport.csv' %in% lF) {
+  mainReport <- read.csv('WDCM_GeoReport.csv',
  header = T,
  row.names = 1,
  check.names = F,
@@ -200,18 +199,128 @@
   Time = as.character(Sys.time()),
   stringsAsFactors = F)
   mainReport <- rbind(mainReport, newReport)
-  write.csv(mainReport, 'WDCM_MainReport.csv')
+  write.csv(mainReport, 'WDCM_GeoReport.csv')
 } else {
   newReport <- data.frame(Step = 'CollectItems',
   Time = as.character(Sys.time()),
   stringsAsFactors = F)
-  write.csv(newReport, 'WDCM_MainReport.csv')
+  write.csv(newReport, 'WDCM_GeoReport.csv')
 }
 
 ### ---
 ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---
 
+# - to runtime Log:
+print("--- ETL: Wikidata usage statistics from wdcm_maintable")
+
+### --- read item categories:
+setwd(itemsDir)
+
+idFiles <- list.files()
+idFiles <- idFiles[grepl(".csv$", idFiles)]
+categories <- unname(sapply(idFiles, function(x) {
+  strsplit(x, split = "_")[[1]][1]
+}))
+
+for (i in 1:length(categories)) {
+  
+  # - filename
+  filename <- paste("wdcm_geoitem_",
+gsub(" ", "", categories[i], fixed = T), ".tsv",
+sep = "")
+  
+  # - geoitems
+  geoitems <- read.csv(idFiles[i],
+   header = T,
+   check.names = F,
+   stringsAsFactors = F)
+  searchitems <- geoitems$item
+  
+  # - to runtime Log:
+  print(paste("--- processing: ", i, ": ", filename, sep = ""))
+  
+  # - to runtime Log:
+  print("--- RUNNING HiveQL Query to search for geoitems.")
+  hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;"
+  hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM 
wdcm_maintable WHERE eu_entity_id IN (",
+ paste("\"", searchitems, "\"", collapse = ", ", sep = 
""),
+ ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 
1000;",
+ sep = "")
+  hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ")
+  
+  # - write hiveQLQuery locally:
+  setwd(fPath)
+  write(hiveQLQuery, "hiveQL_geoQuery.hql")
+  
+  # - execute HiveQLQuery:
+  hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", 
+  getwd(), "/hiveQL_geoQuery.hql", 
+  " > ", dataDir, "/", filename, 
+  sep = "")
+  

[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017 ed2

2017-12-16 Thread GoranSMilovanovic (Code Review)
GoranSMilovanovic has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/398667 )

Change subject: EngineGeo 16 Dec 2017 ed2
..

EngineGeo 16 Dec 2017 ed2

Change-Id: I0f16705bf73bd0e66a19636321fd438b386f6139
---
M WDCM_EngineGeo_goransm.R
M WDCM_Engine_goransm.R
2 files changed, 122 insertions(+), 14 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/wmde/WDCM 
refs/changes/67/398667/1

diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 25e8a40..9d9c8df 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -54,6 +54,7 @@
 library(httr)
 library(XML)
 # - wrangling:
+library(dplyr)
 library(stringr)
 library(readr)
 library(data.table)
@@ -62,11 +63,11 @@
 
 ### --- Directories
 # - fPath: where the scripts is run from?
-fPath <- 
'/home/goransm/Work/___DataKolektiv/Projects/WikimediaDEU/_WMDE_Projects/WDCM_Dev/WDCM/'
+fPath <- '/home/goransm/RScripts/WDCM_R'
 # - form paths:
-ontologyDir <- paste(fPath, 'WDCM_Ontology', sep = "")
-logDir <- paste(fPath, 'WDCM_Logs', sep = "")
-itemsDir <- paste(fPath, 'WDCM_CollectedGeoItems', sep = "")
+ontologyDir <- paste(fPath, '/WDCM_Ontology', sep = "")
+logDir <- paste(fPath, '/WDCM_Logs', sep = "")
+itemsDir <- paste(fPath, '/WDCM_CollectedGeoItems', sep = "")
 # - stat1005 published-datasets, maps onto 
 # - https://analytics.wikimedia.org/datasets/wdcm/
 dataDir <- '/srv/published-datasets/wdcm'
@@ -75,9 +76,9 @@
 print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " "))
 
 ### --- Set proxy
-# Sys.setenv(
-#   http_proxy = "http://webproxy.eqiad.wmnet:8080;,
-#   https_proxy = "http://webproxy.eqiad.wmnet:8080;)
+Sys.setenv(
+  http_proxy = "http://webproxy.eqiad.wmnet:8080;,
+  https_proxy = "http://webproxy.eqiad.wmnet:8080;)
 
 ### --- Read WDCM_GeoItems
 # - to runtime Log:
@@ -114,8 +115,6 @@
   strsplit(wdcmGeoItems$item[i],
split = ",", fixed = T)[[1]],
   "both")
-  
-  itemsOut <- list()
   
   # - Construct Query:
   query <- paste0(
@@ -190,8 +189,8 @@
 write.csv(qErrors, "WDCM_Collect_GeoItems_SPARQL_Errors.csv")
 # - write to WDCM main reporting file:
 lF <- list.files()
-if ('WDCM_MainReport.csv' %in% lF) {
-  mainReport <- read.csv('WDCM_MainReport.csv',
+if ('WDCM_GeoReport.csv' %in% lF) {
+  mainReport <- read.csv('WDCM_GeoReport.csv',
  header = T,
  row.names = 1,
  check.names = F,
@@ -200,18 +199,128 @@
   Time = as.character(Sys.time()),
   stringsAsFactors = F)
   mainReport <- rbind(mainReport, newReport)
-  write.csv(mainReport, 'WDCM_MainReport.csv')
+  write.csv(mainReport, 'WDCM_GeoReport.csv')
 } else {
   newReport <- data.frame(Step = 'CollectItems',
   Time = as.character(Sys.time()),
   stringsAsFactors = F)
-  write.csv(newReport, 'WDCM_MainReport.csv')
+  write.csv(newReport, 'WDCM_GeoReport.csv')
 }
 
 ### ---
 ### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---
 
+# - to runtime Log:
+print("--- ETL: Wikidata usage statistics from wdcm_maintable")
+
+### --- read item categories:
+setwd(itemsDir)
+
+idFiles <- list.files()
+idFiles <- idFiles[grepl(".csv$", idFiles)]
+categories <- unname(sapply(idFiles, function(x) {
+  strsplit(x, split = "_")[[1]][1]
+}))
+
+for (i in 1:length(categories)) {
+  
+  # - filename
+  filename <- paste("wdcm_geoitem_",
+gsub(" ", "", categories[i], fixed = T), ".tsv",
+sep = "")
+  
+  # - geoitems
+  geoitems <- read.csv(idFiles[i],
+   header = T,
+   check.names = F,
+   stringsAsFactors = F)
+  searchitems <- geoitems$item
+  
+  # - to runtime Log:
+  print(paste("--- processing: ", i, ": ", filename, sep = ""))
+  
+  # - to runtime Log:
+  print("--- RUNNING HiveQL Query to search for geoitems.")
+  hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;"
+  hiveQLQuery_2 <- paste("SELECT eu_entity_id, SUM(eu_count) AS usage FROM 
wdcm_maintable WHERE eu_entity_id IN (",
+ paste("\"", searchitems, "\"", collapse = ", ", sep = 
""),
+ ") GROUP BY eu_entity_id ORDER BY usage DESC LIMIT 
1000;",
+ sep = "")
+  hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ")
+  
+  # - write hiveQLQuery locally:
+  setwd(fPath)
+  write(hiveQLQuery, "hiveQL_geoQuery.hql")
+  
+  # - execute HiveQLQuery:
+  hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", 
+  getwd(), "/hiveQL_geoQuery.hql", 
+  " > ", dataDir, "/", filename, 
+  

[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017

2017-12-16 Thread GoranSMilovanovic (Code Review)
GoranSMilovanovic has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/398666 )

Change subject: EngineGeo 16 Dec 2017
..


EngineGeo 16 Dec 2017

Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0
---
M WDCM_EngineGeo_goransm.R
1 file changed, 1 insertion(+), 900 deletions(-)

Approvals:
  GoranSMilovanovic: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 7013c3a..25e8a40 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -209,908 +209,9 @@
 }
 
 ### ---
-### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module
+### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---
 
-### ---
-### --- WDCM Search Module, v. Beta 0.1
-### --- Script: WDCM_Search_Clients.R, v. Beta 0.1
-### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
-### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
-### --- and WMDE.
-### --- Contact: goran.milovanovic_...@wikimedia.de
-### ---
-### --- DESCRIPTION:
-### --- WDCM_Search_Clients.R takes a list of
-### --- item IDs from Wikidata (the list is delivered by
-### --- the WDCM_Collect_Items.R script) and searches for their
-### --- usage across the Wikimedia projects in Hadoop:
-### --- database: goransm
-### --- directory: wdcmsqoop
-### --- table: wdcm_clients_wb_entity_usage
-### --- from production (currently: stat1005.eqiad.wmnet).
-### --- NOTE: wdcm_clients_wb_entity_usage is produced by
-### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet)
-### ---
-### --- INPUT:
-### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs
-### --- to search for from /WDCM_CollectedItems
-### --- This folder contains the .csv files that specify the item IDs
-### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R
-### ---
-### --- OUTPUT:
-### --- wdcm_maintable Hive table on hdfs, database: goransm
-### ---
-
-### --- read item categories:
-setwd(itemsDir)
-idFiles <- list.files()
-idFiles <- idFiles[grepl(".csv$", idFiles)]
-idFilesSize <- file.size(idFiles)/1024^2
-
-### --- Track all categories under processing:
-# - check for the existence of the wdcmSearchReport file
-# - and delete the old file if it's found:
-setwd(logDir)
-lF <- list.files()
-w <- which(grepl("^wdcmSearchReport", lF))
-if (length(w) == 1) {
-  file.remove(lF[w])
-}
-### --- generate wdcmSearchReport data.frame
-wdcmSearchReport <- data.frame(category = idFiles,
-   fileSize = idFilesSize,
-   startTime = character(length(idFiles)),
-   endTime = character(length(idFiles)),
-   stringsAsFactors = F
-)
-wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ]
-
-### --- PREPARATION: delete goransm.wdcm_maintable if exists,
-### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs 
path)
-
-### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it:
-# - NOTE: drop wdcm_maintable == erase metastore data:
-# - [query01Err]
-
-# - to runtime Log:
-print("Running query [query01Err].")
-
-query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-if (query01Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query01Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-  # - check errors:
-  if (query01Err != 0) {
-# - to runtime Log:
-print("--- (!!!) query01Err failed AGAIN: quiting.")
-quit()
-  }
-}
-
-### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-# - [query02Err]
-# - to runtime Log:
-print("--- Running query [query02Err].")
-query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', 
wait = T)
-if (query02Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query02Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query02Err <- system(command = 'hdfs dfs -rm 

[MediaWiki-commits] [Gerrit] analytics...WDCM[master]: EngineGeo 16 Dec 2017

2017-12-16 Thread GoranSMilovanovic (Code Review)
GoranSMilovanovic has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/398666 )

Change subject: EngineGeo 16 Dec 2017
..

EngineGeo 16 Dec 2017

Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0
---
M WDCM_EngineGeo_goransm.R
1 file changed, 1 insertion(+), 900 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/wmde/WDCM 
refs/changes/66/398666/1

diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 7013c3a..25e8a40 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -209,908 +209,9 @@
 }
 
 ### ---
-### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module
+### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---
 
-### ---
-### --- WDCM Search Module, v. Beta 0.1
-### --- Script: WDCM_Search_Clients.R, v. Beta 0.1
-### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
-### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
-### --- and WMDE.
-### --- Contact: goran.milovanovic_...@wikimedia.de
-### ---
-### --- DESCRIPTION:
-### --- WDCM_Search_Clients.R takes a list of
-### --- item IDs from Wikidata (the list is delivered by
-### --- the WDCM_Collect_Items.R script) and searches for their
-### --- usage across the Wikimedia projects in Hadoop:
-### --- database: goransm
-### --- directory: wdcmsqoop
-### --- table: wdcm_clients_wb_entity_usage
-### --- from production (currently: stat1005.eqiad.wmnet).
-### --- NOTE: wdcm_clients_wb_entity_usage is produced by
-### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet)
-### ---
-### --- INPUT:
-### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs
-### --- to search for from /WDCM_CollectedItems
-### --- This folder contains the .csv files that specify the item IDs
-### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R
-### ---
-### --- OUTPUT:
-### --- wdcm_maintable Hive table on hdfs, database: goransm
-### ---
-
-### --- read item categories:
-setwd(itemsDir)
-idFiles <- list.files()
-idFiles <- idFiles[grepl(".csv$", idFiles)]
-idFilesSize <- file.size(idFiles)/1024^2
-
-### --- Track all categories under processing:
-# - check for the existence of the wdcmSearchReport file
-# - and delete the old file if it's found:
-setwd(logDir)
-lF <- list.files()
-w <- which(grepl("^wdcmSearchReport", lF))
-if (length(w) == 1) {
-  file.remove(lF[w])
-}
-### --- generate wdcmSearchReport data.frame
-wdcmSearchReport <- data.frame(category = idFiles,
-   fileSize = idFilesSize,
-   startTime = character(length(idFiles)),
-   endTime = character(length(idFiles)),
-   stringsAsFactors = F
-)
-wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ]
-
-### --- PREPARATION: delete goransm.wdcm_maintable if exists,
-### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs 
path)
-
-### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it:
-# - NOTE: drop wdcm_maintable == erase metastore data:
-# - [query01Err]
-
-# - to runtime Log:
-print("Running query [query01Err].")
-
-query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-if (query01Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query01Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-  # - check errors:
-  if (query01Err != 0) {
-# - to runtime Log:
-print("--- (!!!) query01Err failed AGAIN: quiting.")
-quit()
-  }
-}
-
-### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-# - [query02Err]
-# - to runtime Log:
-print("--- Running query [query02Err].")
-query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', 
wait = T)
-if (query02Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query02Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query02Err <- system(command = 'hdfs dfs -rm -r