GoranSMilovanovic has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/394737 )

Change subject: Non-productionized runs 02 Dec 2017

Non-productionized runs 02 Dec 2017

Change-Id: I5cac7822b41c93d5ca60e5bee63e84f053f39758
A WDCM_Engine_goransm.R
M WDCM_Sqoop_Clients.R
2 files changed, 885 insertions(+), 4 deletions(-)

  GoranSMilovanovic: Verified; Looks good to me, approved

diff --git a/WDCM_Engine_goransm.R b/WDCM_Engine_goransm.R
new file mode 100644
index 0000000..d7a2528
--- /dev/null
+++ b/WDCM_Engine_goransm.R
@@ -0,0 +1,874 @@
+#!/usr/bin/env Rscript
+### ---------------------------------------------------------------------------
+### --- WDCM Engine, v. Beta 0.1
+### --- Script: WDCM_Engine.R, v. Beta 0.1
+### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
+### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
+### --- and WMDE.
+### --- Contact: goran.milovanovic_...@wikimedia.de
+### ---------------------------------------------------------------------------
+### --- WDCM_Engine_goransm.R unifies the previous
+### --- three WDCM Engine scripts:
+### --- WDCM_Collect_Items.R
+### --- WDCM_Search_Clients.R
+### --- WDCM_Pre-Process.R
+### --- NOTE: the execution of this WDCM script is always dependent upon the
+### --- previous WDCM_Sqoop_Clients.R run from stat1004 (currently).
+### --- Each section in WDCM_Engine.R provides additional explanation.
+### --- NOTE: WDCM_Engine.R is the only WDCM R script
+### --- that is run from the statboxes (stat1005 currently)
+### --- to produce the WDCM update.
+### ---------------------------------------------------------------------------
+### --- RUN FROM: /home/goransm/RScripts/WDCM_R
+### --- nohup Rscript WDCM_Engine_goransm.R &
+### ---------------------------------------------------------------------------
+### ---------------------------------------------------------------------------
+### --- LICENSE:
+### ---------------------------------------------------------------------------
+### --- GPL v2
+### --- This file is part of Wikidata Concepts Monitor (WDCM)
+### ---
+### --- WDCM is free software: you can redistribute it and/or modify
+### --- it under the terms of the GNU General Public License as published by
+### --- the Free Software Foundation, either version 2 of the License, or
+### --- (at your option) any later version.
+### ---
+### --- WDCM is distributed in the hope that it will be useful,
+### --- but WITHOUT ANY WARRANTY; without even the implied warranty of
+### --- GNU General Public License for more details.
+### ---
+### --- You should have received a copy of the GNU General Public License
+### --- along with WDCM. If not, see <http://www.gnu.org/licenses/>.
+### ---------------------------------------------------------------------------
+### ---------------------------------------------------------------------------
+### --- Script 1: WDCM_Collect_Items.R, WDCM Search Module
+### ---------------------------------------------------------------------------
+### ---------------------------------------------------------------------------
+### --- WDCM Search Module, v. Beta 0.1
+### --- Script: WDCM_Collect_Items.R, v. Beta 0.1
+### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
+### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
+### --- and WMDE.
+### --- Contact: goran.milovanovic_...@wikimedia.de
+### ---------------------------------------------------------------------------
+### --- WDCM_Collect_Items.R takes a list of concepts (categories)
+### --- defined by a given WDCM Ontology (human input) and then
+### --- contacts the Wikidata Query Service to fetch all relevant item IDs.
+### ---------------------------------------------------------------------------
+### --- INPUT:
+### --- the WDCM_Collect_Items.R reads the WDCM Ontology file (csv)
+### --- from /WDCM_Ontology
+### --- on stat1005
+### --- ACTIVE WDCM TAXONOMY: WDCM_Ontology_Berlin_05032017.csv
+### ---------------------------------------------------------------------------
+### --- OUTPUT:
+### --- Results are stored locally as .csv files on stat1005:
+### --- in: /WDCM_CollectedItems
+### ---------------------------------------------------------------------------
+# - to nohup.out
+print(paste("--- UPDATE RUN STARTED ON:", Sys.time(), sep = " "))
+### --- Setup
+# - contact:
+# - wrangling:
+# - modeling:
+### --- Directories
+ontologyDir <- '/WDCM_Ontology' # - NOTE: starting dir, not '..' 
+logDir <- '../WDCM_Logs'
+itemsDir <- '../WDCM_CollectedItems/'
+dataDir <- '../WDCM_dataOut'
+### --- Set proxy
+  http_proxy = "http://webproxy.eqiad.wmnet:8080";,
+  https_proxy = "http://webproxy.eqiad.wmnet:8080";)
+### --- Read WDCM_Ontology
+wDir <- paste(getwd(), ontologyDir, sep = "")
+wdcmOntology <- read.csv("WDCM_Ontology_Berlin_05032017.csv",
+                         header = T,
+                         check.names = F,
+                         stringsAsFactors = F)
+### --- Select all instances accross all sub-classes of searchItems:
+# - endPoint:
+endPointURL <- 
+# - track the number of items fetched:
+totalN <- numeric()
+# - set itemsDir:
+# - clear output dir:
+lF <- list.files()
+rmF <- file.remove(lF)
+# - track uncompleted queries:
+qErrors <- character()
+# - startTime (WDCM Main Report)
+startTime <- as.character(Sys.time())
+for (i in 1:length(wdcmOntology$CategoryItems)) {
+  # - to nohup.out
+  print(paste("SPARQL category:", i, sep = ""))
+  searchItems <- str_trim(
+    strsplit(wdcmOntology$CategoryItems[i],
+             split = ",", fixed = T)[[1]],
+    "both")
+  itemsOut <- list()
+  for (k in 1:length(searchItems)) {
+    # - Construct Query:
+    query <- paste0(
+      'PREFIX wd: <http://www.wikidata.org/entity/> ',
+      'PREFIX wdt: <http://www.wikidata.org/prop/direct/> ',
+      'SELECT ?item WHERE {?item (wdt:P31|(wdt:P31/wdt:P279*)) wd:',
+      searchItems[k],
+      '.  }'
+    )
+    # Run Query:
+    res <- GET(paste0(endPointURL, URLencode(query)))
+    # If res$status_code == 200, store:
+    if (res$status_code == 200) {
+      # XML:
+      rc <- rawToChar(res$content)
+      rc <- htmlParse(rc)
+      # clear:
+      rm(res); gc()
+      # extract:
+      items <- xpathSApply(rc, "//uri", xmlValue)
+      items <- unname(sapply(items, function(x) {
+        strsplit(x, split = "/", fixed = T)[[1]][length(strsplit(x, split = 
"/", fixed = T)[[1]])]
+      }))
+      # - as.data.frame:
+      items <- data.frame(item = items,
+                          stringsAsFactors = F)
+      # - to itemsOut:
+      itemsOut[[k]] <- items
+      # - clear:
+      rm(items); gc()
+    } else {
+      qErrors <- append(qErrors, searchItems[k])
+    }
+  }
+  # - itemsOut as data.frame:
+  itemsOut <- rbindlist(itemsOut)
+  # - keep only unique items:
+  w <- which(!(duplicated(itemsOut$item)))
+  itemsOut <- itemsOut[w]
+  # store as CSV
+  write_csv(itemsOut, path = paste0(wdcmOntology$Category[i],"_ItemIDs.csv"))
+  # total numeber of concepts ++:
+  totalN <- append(totalN, length(itemsOut$item))
+  # clear:
+  rm(itemsOut); gc()
+### --- Fix WDCM_Ontology (Phab T174896#3762820)
+# - remove Geographical Object from Organization:
+organizationItems <- read.csv('Organization_ItemIDs.csv',
+                              header = T,
+                              check.names = F,
+                              stringsAsFactors = F)
+geoObjItems <- read.csv('Geographical Object_ItemIDs.csv',
+                              header = T,
+                              check.names = F,
+                              stringsAsFactors = F)
+w <- which(organizationItems$item %in% geoObjItems$item)
+if (length(w) > 0) {
+  organizationItems <- organizationItems[-w, ]
+organizationItems <- data.frame(item = organizationItems,
+                                stringsAsFactors = F)
+# - store:
+write.csv(organizationItems, 'Organization_ItemIDs.csv')
+# - clear:
+rm(organizationItems); rm(geoObjItems); gc()
+# - remove Book from Work of Art:
+bookItems <- read.csv('Book_ItemIDs.csv',
+                      header = T,
+                      check.names = F,
+                      stringsAsFactors = F)
+WorkOfArtItems <- read.csv('Work Of Art_ItemIDs.csv',
+                           header = T,
+                           check.names = F,
+                           stringsAsFactors = F)
+w <- which(WorkOfArtItems$item %in% bookItems$item)
+if (length(w) > 0) {
+  WorkOfArtItems <- WorkOfArtItems[-w, ]
+WorkOfArtItems <- data.frame(item = WorkOfArtItems,
+                             stringsAsFactors = F)
+# - store:
+write.csv(WorkOfArtItems, 'Work Of Art_ItemIDs.csv')
+# - clear:
+rm(WorkOfArtItems); rm(bookItems); gc()
+# - remove Architectural Structure from Geographical Object:
+architectureItems <- read.csv('Architectural Structure_ItemIDs.csv',
+                              header = T,
+                              check.names = F,
+                              stringsAsFactors = F)
+geoObjItems <- read.csv('Geographical Object_ItemIDs.csv',
+                        header = T,
+                        check.names = F,
+                        stringsAsFactors = F)
+w <- which(geoObjItems$item %in% architectureItems$item)
+if (length(w) > 0) {
+  geoObjItems <- geoObjItems[-w, ]
+geoObjItems <- data.frame(item = geoObjItems,
+                          stringsAsFactors = F)
+# - store:
+write.csv(geoObjItems, 'Geographical Object_ItemIDs.csv')
+# - clear:
+rm(geoObjItems); rm(architectureItems); gc()
+# - log uncompleted queries;
+# - set log dir:
+write.csv(qErrors, "WDCM_CollectItems_SPARQL_Errors.csv")
+# - write to WDCM main reporting file:
+# - check whether WDCM_MainReport.csv exists:
+lF <- list.files()
+if ('WDCM_MainReport.csv' %in% lF) {
+  mainReport <- read.csv('WDCM_MainReport.csv',
+                         header = T,
+                         row.names = 1,
+                         check.names = F,
+                         stringsAsFactors = F)
+  newReport <- data.frame(Step = 'CollectItems',
+                          Time = as.character(Sys.time()),
+                          stringsAsFactors = F)
+  mainReport <- rbind(mainReport, newReport)
+  write.csv(mainReport, 'WDCM_MainReport.csv')
+} else {
+  newReport <- data.frame(Step = 'CollectItems',
+                          Time = as.character(Sys.time()),
+                          stringsAsFactors = F)
+  write.csv(newReport, 'WDCM_MainReport.csv')
+### ---------------------------------------------------------------------------
+### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module
+### ---------------------------------------------------------------------------
+### ---------------------------------------------------------------------------
+### --- WDCM Search Module, v. Beta 0.1
+### --- Script: WDCM_Search_Clients.R, v. Beta 0.1
+### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
+### --- Developed under the contract between Goran Milovanovic PR Data 
+### --- and WMDE.
+### --- Contact: goran.milovanovic_...@wikimedia.de
+### ---------------------------------------------------------------------------
+### --- WDCM_Search_Clients.R takes a list of
+### --- item IDs from Wikidata (the list is delivered by
+### --- the WDCM_Collect_Items.R script) and searches for their
+### --- usage across the Wikimedia projects in Hadoop:
+### --- database: goransm
+### --- directory: wdcmsqoop
+### --- table: wdcm_clients_wb_entity_usage
+### --- from production (currently: stat1005.eqiad.wmnet).
+### --- NOTE: wdcm_clients_wb_entity_usage is produced by
+### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet)
+### ---------------------------------------------------------------------------
+### --- INPUT: 
+### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs
+### --- to search for from /WDCM_CollectedItems
+### --- This folder contains the .csv files that specify the item IDs
+### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R 
+### ---------------------------------------------------------------------------
+### --- OUTPUT: 
+### --- wdcm_maintable Hive table on hdfs, database: goransm
+### ---------------------------------------------------------------------------
+### --- Setup
+scriptDir <- '../'
+# - read item categories:
+idFiles <- list.files()
+idFiles <- idFiles[grepl(".csv$", idFiles)]
+idFilesSize <- file.size(idFiles)/1024^2
+# - Track all categories under processing:
+wdcmSearchReport <- data.frame(category = idFiles,
+                               fileSize = idFilesSize,
+                               startTime = character(length(idFiles)),
+                               endTime = character(length(idFiles)),
+                               stringsAsFactors = F
+wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ]
+# - check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it:
+# - beeline drop wdcm_maintable (erase metastore data):
+system(command = 'beeline --silent -e "USE goransm; DROP TABLE IF EXISTS 
wdcm_maintable;"', wait = T)
+# - delete files for EXTERNAL Hive table from /user/goransm/wdcmtables (hdfs 
+system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', wait = T)
+# - make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs path)
+system(command = 'hdfs dfs -mkdir /user/goransm/wdcmtables', wait = T)
+# - loop over item categories:
+for (i in 1:length(wdcmSearchReport$category)) {
+  # - start time for this category:
+  wdcmSearchReport$startTime[i] <- as.character(Sys.time())
+  ### --- read item IDs:
+  wFile <- which(grepl(wdcmSearchReport$category[i], idFiles, fixed = T))
+  qIDs <- read.csv(idFiles[wFile],
+                   header = T,
+                   check.names = F,
+                   stringsAsFactors = F)
+  qIDs <- qIDs$item
+  qIDs <- qIDs[grepl("^Q[[:digit:]]+", qIDs)]
+  ### --- cut into batches (if necessary)
+  # - cut into batches (5MB max. batch size)
+  batchNum <- ceiling(wdcmSearchReport$fileSize[i]/10)
+  batchSize <- round(length(qIDs)/batchNum)
+  startBatchIx <- c(1:batchNum) * batchSize - batchSize + 1
+  stopBatchIx <- c(1:batchNum) * batchSize
+  stopBatchIx[batchNum] <- length(qIDs)
+  for (batch in 1:batchNum) {
+    # - short report:
+    print("--------------------------------------------------------")
+    print(paste("------------- Processing category: ", i, ": ", 
wdcmSearchReport$category[i], sep = ""))
+    print("--------------------------------------------------------")
+    print(paste("------------- Processing batch: ", batch, " out of ", 
batchNum, sep = ""))
+    print("--------------------------------------------------------")
+    # - create goransm.wdcm_maintable Hive table if this is the first entry:
+    # - (create wdcm_maintable Hive Table on (hdfs path): 
+    if ((i == 1) & (batch == 1)) {
+      print("--------------------------------------------------------")
+      print("------------- CREATE wdcm_maintable TABLE --------------")
+      print("--------------------------------------------------------")
+      hiveCommand <- "\"USE goransm; CREATE EXTERNAL TABLE 
+      \\\`eu_entity_id\\\`        string      COMMENT '',
+      \\\`eu_project\\\`           string      COMMENT '',
+      \\\`eu_count\\\`          bigint      COMMENT ''
+      )
+      COMMENT
+      ''
+      \\\`category\\\` string COMMENT 'The item category')
+      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+      'hdfs://analytics-hadoop/user/goransm/wdcmtables';\""
+      hiveCommand <- paste("beeline --silent -e ", hiveCommand, sep = "")
+      system(command = hiveCommand, wait = TRUE)
+    }
+    # - construct HiveQL query to search for category i items
+    # - across all wiki_db:
+    print("--------------------------------------------------------")
+    print("------------- RUNNING HiveQL Query ---------------------")
+    print("------------- to search for category items -------------")
+    print("--------------------------------------------------------")
+    hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;"
+    hiveQLQuery_2 <- paste("INSERT INTO TABLE wdcm_maintable
+                           PARTITION (category='",
+                           strsplit(wdcmSearchReport$category[i], split = "_", 
fixed = T)[[1]][1],
+                           "') ",
+                           "SELECT eu_entity_id, wiki_db AS eu_project, 
COUNT(*) AS eu_count FROM (
+                           SELECT DISTINCT eu_entity_id, eu_page_id, wiki_db 
FROM wdcm_clients_wb_entity_usage
+                           WHERE eu_entity_id IN (",
+                           paste("\"", 
qIDs[startBatchIx[batch]:stopBatchIx[batch]], "\"", collapse = ", ", sep = ""),
+                           ")) AS t
+                           GROUP BY eu_entity_id, wiki_db;",
+                           sep = "")
+    hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ")
+    # - write hiveQLQuery locally:
+    setwd(scriptDir)
+    write(hiveQLQuery, "hiveQLQuery.hql")
+    # - execute HiveQLQuery:
+    hiveQLQueryCommand <- "beeline --silent -f 
+    system(command = hiveQLQueryCommand, wait = TRUE)
+    print("--------------------------------------------------------")
+    print("------------- REPAIR TABLE -----------------------------")
+    print("--------------------------------------------------------")
+    # - repair partitions:
+    system(command =
+             'beeline --silent -e "USE goransm; SET hive.mapred.mode = 
nonstrict; MSCK REPAIR TABLE wdcm_maintable;"',
+           wait = TRUE)
+    # - end time for this category:
+    wdcmSearchReport$endTime[i] <- as.character(Sys.time())
+    # - back to item categories:
+    setwd(paste(getwd(), gsub("..", "", itemsDir, fixed = T), sep = ""))
+  }
+# - store report:
+          paste("wdcmSearchReport_", 
+                strsplit(as.character(Sys.time()), 
+                         split = " ")[[1]][1], 
+                ".csv", 
+                sep = ""))
+# - write to WDCM main reporting file:
+mainReport <- read.csv('WDCM_MainReport.csv',
+                       header = T,
+                       row.names = 1,
+                       check.names = F,
+                       stringsAsFactors = F)
+newReport <- data.frame(Step = 'SearchItems',
+                        Time = as.character(Sys.time()),
+                        stringsAsFactors = F)
+mainReport <- rbind(mainReport, newReport)
+write.csv(mainReport, 'WDCM_MainReport.csv')
+### ---------------------------------------------------------------------------
+### --- Script 3: WDCM_Pre-Process.R, WDCM Process Module
+### ---------------------------------------------------------------------------
+### ---------------------------------------------------------------------------
+### --- WDCM Process Module, v. Beta 0.1
+### --- Script: WDCM_Pre-Process.R, v. Beta 0.1
+### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
+### --- Developed under the contract between Goran Milovanovic PR Data 
+### --- and WMDE.
+### --- Contact: goran.milovanovic_...@wikimedia.de
+### ---------------------------------------------------------------------------
+### --- WDCM_Pre-Process.R works with the
+### --- wdcm_maintable Hive table on hdfs, database: goransm
+### --- to produce the .tsv files that migrate to
+### --- the wikidataconcepts.wmflabs.org Cloud VPS instance
+### --- from production (currently: stat1005.eqiad.wmnet).
+### ---------------------------------------------------------------------------
+### --- INPUT: 
+### --- wdcm_maintable Hive table on hdfs, database: goransm
+### ---------------------------------------------------------------------------
+### --- OUTPUT: 
+### --- Results are stored locally as .tsv files on production -
+### --- - on stat1005.eqiad.wmnet - in:
+### --- /WDCM_dataOut
+### --- These output .tsv files migrate to Labs:
+### --- wikidataconcepts.wmflabs.org Cloud VPS instance
+### --- where they are then processed by the WDCM Process Module.
+### ---------------------------------------------------------------------------
+### --- functions
+# - projectType() to determine project type
+projectType <- function(projectName) {
+  unname(sapply(projectName, function(x) {
+    if (grepl("commons", x, fixed = T)) {"Commons"
+    } else if (grepl("mediawiki|meta|species|wikidata", x)) {"Other"
+    } else if (grepl("wiki$", x)) {"Wikipedia"
+    } else if (grepl("quote$", x)) {"Wikiquote"
+    } else if (grepl("voyage$", x)) {"Wikivoyage"
+    } else if (grepl("news$", x)) {"Wikinews"
+    } else if (grepl("source$", x)) {"Wikisource"
+    } else if (grepl("wiktionary$", x)) {"Wiktionary"
+    } else if (grepl("versity$", x)) {"Wikiversity"
+    } else if (grepl("books$", x)) {"Wikibooks"
+    } else {"Other"}
+  }))
+### --- produce wdcm_item.tsv from wdcm_maintable (hdfs, database: goransm)
+### --- NOTE: one .tsv file per category (~14M rows, causes Java gc overflow 
from beeline...)
+# - read item categories:
+idFiles <- list.files()
+idFiles <- idFiles[grepl(".csv$", idFiles)]
+categories <- unname(sapply(idFiles, function(x) {
+  strsplit(x, split = "_")[[1]][1]
+for (i in 1:length(categories)) {
+  filename <- paste("wdcm_item_", 
+                    gsub(" ", "", categories[i], fixed = T), ".tsv", 
+                    sep = "")
+  hiveQLquery <- paste(
+    'USE goransm; SELECT eu_entity_id, SUM(eu_count) AS eu_count FROM 
wdcm_maintable WHERE category=\\"',
+    categories[i],
+    '\\" GROUP BY eu_entity_id ORDER BY eu_count DESC LIMIT 10000000;',
+    sep = "")
+  system(command = paste('beeline --silent -e "',
+                         hiveQLquery,
+                         '" > /home/goransm/RScripts/WDCM_R/WDCM_dataOut/',
+                         filename,
+                         sep = ""),
+         wait = TRUE)
+### --- to dataDir:
+# - clear dataDir:
+lF <- list.files()
+rmF <- file.remove(lF)
+### --- ETL Phase
+### --- produce wdcm_project_category.tsv from wdcm_maintable (hdfs, database: 
+hiveQLquery <- 'USE goransm; 
+                SET hive.mapred.mode=unstrict; 
+                SELECT eu_project, category, SUM(eu_count) AS eu_count 
+                FROM wdcm_maintable 
+                GROUP BY eu_project, category ORDER BY eu_count DESC LIMIT 
+system(command = paste('beeline --silent -e "',
+                       hiveQLquery,
+                       '" > 
+                       sep = ""),
+       wait = TRUE)
+# - add projecttype to wdcm_project_category.tsv
+wdcm_project_category <- as.data.frame(fread('wdcm_project_category.tsv'))
+wdcm_project_category$projectype <- 
+write.csv(wdcm_project_category, "wdcm_project_category.csv")
+### --- produce wdcm_project.tsv from wdcm_maintable (hdfs, database: goransm)
+hiveQLquery <- 'USE goransm; 
+                SET hive.mapred.mode=unstrict; 
+                SELECT eu_project, SUM(eu_count) AS eu_count 
+                FROM wdcm_maintable 
+                GROUP BY eu_project ORDER BY eu_count DESC LIMIT 10000000;'
+system(command = paste('beeline --silent -e "',
+                       hiveQLquery,
+                       '" > 
+                       sep = ""),
+       wait = TRUE)
+# - add projecttype to wdcm_project.tsv
+wdcm_project <- as.data.frame(fread('wdcm_project.tsv'))
+wdcm_project$projectype <- projectType(wdcm_project$eu_project)
+write.csv(wdcm_project, "wdcm_project.csv")
+### --- produce wdcm_category.tsv from wdcm_maintable (hdfs, database: goransm)
+hiveQLquery <- 'USE goransm; 
+                SET hive.mapred.mode=unstrict; 
+                SELECT category, SUM(eu_count) AS eu_count 
+                FROM wdcm_maintable 
+                GROUP BY category ORDER BY eu_count DESC LIMIT 10000000;'
+system(command = paste('beeline --silent -e "',
+                       hiveQLquery,
+                       '" > 
+                       sep = ""),
+       wait = TRUE)
+# - save wdcm_category.tsv as .csv
+wdcm_category <- as.data.frame(fread('wdcm_category.tsv'))
+write.csv(wdcm_category, "wdcm_category.csv")
+### --- produce wdcm_project_category_item100.tsv from wdcm_maintable (hdfs, 
database: goransm)
+hiveQLquery <- 'USE goransm; 
+                SET hive.mapred.mode=unstrict; 
+                SELECT * FROM (
+                  SELECT eu_project, category, eu_entity_id, eu_count, 
ROW_NUMBER() OVER (PARTITION BY eu_project, category ORDER BY eu_count DESC) AS 
+                    FROM wdcm_maintable) t 
+                WHERE row <= 100;'
+system(command = paste('beeline --silent -e "',
+                       hiveQLquery,
+                       '" > 
+                       sep = ""),
+       wait = TRUE)
+# - add projecttype to wdcm_project_category_item100.tsv
+wdcm_project_category_item100 <- 
+wdcm_project_category_item100$projectype <- 
+write.csv(wdcm_project_category_item100, "wdcm_project_category_item100.csv")
+### --- produce wdcm_project_item100.tsv from wdcm_maintable (hdfs, database: 
+hiveQLquery <- 'USE goransm; 
+                SET hive.mapred.mode=unstrict; 
+                SELECT * FROM (
+                  SELECT eu_project, eu_entity_id, eu_count, ROW_NUMBER() OVER 
(PARTITION BY eu_project ORDER BY eu_count DESC) AS row 
+                FROM wdcm_maintable) t 
+                WHERE row <= 100;'
+system(command = paste('beeline --silent -e "',
+                       hiveQLquery,
+                       '" > 
+                       sep = ""),
+       wait = TRUE)
+# - add projecttype to wdcm_project_item100.tsv
+wdcm_project_item100 <- as.data.frame(fread('wdcm_project_item100.tsv'))
+wdcm_project_item100$projectype <- 
+write.csv(wdcm_project_item100, "wdcm_project_item100.csv")
+### --- Semantic Modeling Phase
+### --- produce project-item matrices for semantic topic modeling
+print("Semantic Modeling Phase: TDF MATRICES")
+itemFiles <- list.files()
+itemFiles <- itemFiles[grepl("^wdcm_item", itemFiles)]
+for (i in 1:length(itemFiles)) {
+  # - load categoryFile[i].tsv as data.frame
+  categoryName <- strsplit(itemFiles[i], ".", fixed = T)[[1]][1]
+  categoryName <- strsplit(categoryName, "_", fixed = T)[[1]][3]
+  categoryName <- gsub("([[:lower:]])([[:upper:]])", "\\1 \\2", categoryName)
+  # - load items
+  # - nrows = 5000
+  categoryFile <- fread(itemFiles[i], nrows = 5000)
+  # - list of items to fetch
+  itemList <- categoryFile$eu_entity_id
+  # - hiveQL:
+  hiveQLquery <- paste('USE goransm; SELECT eu_project, eu_entity_id, eu_count 
FROM wdcm_maintable WHERE eu_entity_id IN (',
+                       paste0("'", itemList, "'", collapse = ", ", sep = ""),
+                       ') AND category = \\"',
+                       categoryName,
+                       '\\";',
+                       sep = "")
+  fileName <- gsub(" ", "", categoryName, fixed = T)
+  fileName <- paste("tfMatrix_", fileName, ".tsv", sep = "")
+  system(command = paste('beeline --silent -e "',
+                         hiveQLquery,
+                         '" > /home/goransm/RScripts/WDCM_R/WDCM_dataOut/',
+                         fileName,
+                         sep = ""),
+         wait = TRUE)
+### --- reshape project-item matrices for semantic topic modeling
+print("Semantic Modeling Phase: RESHAPING TDF MATRICES")
+itemFiles <- list.files()
+itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)]
+itemFiles <- itemFiles[grepl(".tsv", itemFiles, fixed = T)]
+for (i in 1:length(itemFiles)) {
+  # - load categoryFile[i].tsv as data.frame
+  categoryFile <- fread(itemFiles[i])
+  categoryFile <- spread(categoryFile,
+                         key = eu_entity_id,
+                         value = eu_count,
+                         fill = 0)
+  rownames(categoryFile) <- categoryFile$eu_project
+  categoryFile$eu_project <- NULL
+  w <- which(colSums(categoryFile) == 0)
+  if (length(w) > 0) {
+    categoryFile <- categoryFile[, -w]
+  }
+  w <- which(rowSums(categoryFile) == 0)
+  if (length(w) > 0) {
+    categoryFile <- categoryFile[-w, ]
+  }
+  fileName <- paste(strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1], 
".csv", sep = "")
+  write.csv(categoryFile, fileName)
+### --- semantic topic models for each category
+print("Semantic Modeling Phase: LDA estimation")
+itemFiles <- list.files()[grepl(".csv", x = list.files(), fixed = T)]
+itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)]
+for (i in 1:length(itemFiles)) {
+  categoryName <- strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1]
+  categoryName <- strsplit(categoryName, split = "_", fixed = T)[[1]][2]
+  # - topic modeling:
+  itemCat <- read.csv(itemFiles[i],
+                      header = T,
+                      check.names = F,
+                      row.names = 1,
+                      stringsAsFactors = F)
+  itemCat <- as.simple_triplet_matrix(itemCat)
+  # - run on K = seq(2,20) semantic topics
+  topicModel <- list()
+  numTopics <- seq(2, 10, by = 1)
+  for (k in 1:length(numTopics)) {
+    topicModel[[k]] <- maptpx::topics(counts = itemCat,
+                                      K = numTopics[k],
+                                      shape = NULL,
+                                      initopics = NULL,
+                                      tol = 0.1,
+                                      bf = T,
+                                      kill = 0,
+                                      ord = TRUE,
+                                      verb = 2)
+  }
+  # - clear:
+  rm(itemCat); gc()
+  # - determine model from Bayes Factor against Null:
+  wModel <- which.max(sapply(topicModel, function(x) {x$BF}))
+  topicModel <- topicModel[[wModel]]
+  # - collect matrices:
+  wdcm_itemtopic <- as.data.frame(topicModel$theta)
+  colnames(wdcm_itemtopic) <- paste("topic", seq(1, dim(wdcm_itemtopic)[2]), 
sep = "")
+  itemTopicFileName <- paste('wdcm2_itemtopic',
+                             paste(categoryName, ".csv", sep = ""),
+                             sep = "_")
+  write.csv(wdcm_itemtopic, itemTopicFileName)
+  wdcm_projecttopic <- as.data.frame(topicModel$omega)
+  colnames(wdcm_projecttopic) <- paste("topic", seq(1, 
dim(wdcm_projecttopic)[2]), sep = "")
+  wdcm_projecttopic$project <- rownames(wdcm_projecttopic)
+  wdcm_projecttopic$projecttype <- projectType(wdcm_projecttopic$project)
+  projectTopicFileName <- paste('wdcm2_projecttopic',
+                                paste(categoryName, ".csv", sep = ""),
+                                sep = "_")
+  write.csv(wdcm_projecttopic, projectTopicFileName)
+  # - clear:
+  rm(topicModel); rm(wdcm_projecttopic); rm(wdcm_itemtopic); gc()
+### --- t-SNE 2D maps from wdcm2_projectttopic files: projects similarity 
+print("Semantic Modeling Phase: t-SNE 2D MAPS")
+projectFiles <- list.files()
+projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)]
+for (i in 1:length(projectFiles)) {
+  # filename:
+  fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1]
+  fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3]
+  fileName <- paste("wdcm2_tsne2D_project_", fileName, ".csv", sep = "")
+  # load:
+  projectTopics <- read.csv(projectFiles[i], 
+                            header = T,
+                            check.names = F,
+                            row.names = 1,
+                            stringsAsFactors = F)
+  projectTopics$project <- NULL
+  projectTopics$projecttype <- NULL
+  # - Distance space, metric: Hellinger
+  projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = 
+  # - t-SNE 2D map
+  tsneProject <- Rtsne(projectDist, 
+                       theta = .5, 
+                       is_distance = T,
+                       perplexity = 10)
+  # - store:
+  tsneProject <- as.data.frame(tsneProject$Y)
+  colnames(tsneProject) <- paste("D", seq(1:dim(tsneProject)[2]), sep = "")
+  tsneProject$project <- rownames(projectTopics)
+  tsneProject$projecttype <- projectType(tsneProject$project)
+  write.csv(tsneProject, fileName)
+  # - clear:
+  rm(projectTopics); rm(projectDist); rm(tsneProject)
+### --- {visNetwork} graphs from wdcm2_projectttopic files: projects 
similarity structure
+projectFiles <- list.files()
+projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)]
+for (i in 1:length(projectFiles)) {
+  # - load:
+  projectTopics <- read.csv(projectFiles[i], 
+                            header = T,
+                            check.names = F,
+                            row.names = 1,
+                            stringsAsFactors = F)
+  projectTopics$project <- NULL
+  projectTopics$projecttype <- NULL
+  # - Distance space, metric: Hellinger
+  projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = 
+  # - {visNetwork} nodes data.frame:
+  indexMinDist <- sapply(rownames(projectDist), function(x) {
+    w <- which(rownames(projectDist) %in% x)
+    y <- sort(projectDist[w, -w], decreasing = T)
+    names(y)[length(y)]
+  })
+  id <- 1:length(colnames(projectDist))
+  label <- colnames(projectDist)
+  nodes <- data.frame(id = id,
+                      label = label,
+                      stringsAsFactors = F)
+  # - {visNetwork} edges data.frame:
+  edges <- data.frame(from = names(indexMinDist),
+                      to = unname(indexMinDist),
+                      stringsAsFactors = F)
+  edges$from <- sapply(edges$from, function(x) {
+    nodes$id[which(nodes$label %in% x)]
+  })
+  edges$to <- sapply(edges$to, function(x) {
+    nodes$id[which(nodes$label %in% x)]
+  })
+  edges$arrows <- rep("to", length(edges$to))
+  # filenames:
+  fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1]
+  fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3]
+  nodesFileName <- paste("wdcm2_visNetworkNodes_project_", fileName, ".csv", 
sep = "")
+  edgesFileName <- paste("wdcm2_visNetworkEdges_project_", fileName, ".csv", 
sep = "")
+  # store:
+  write.csv(nodes, nodesFileName)
+  write.csv(edges, edgesFileName)
+  # - clear:
+  rm(projectTopics); rm(projectDist); rm(nodes); rm(edges); gc()
+# - write to WDCM main reporting file:
+mainReport <- read.csv('WDCM_MainReport.csv',
+                       header = T,
+                       row.names = 1,
+                       check.names = F,
+                       stringsAsFactors = F)
+newReport <- data.frame(Step = 'Pre-Process',
+                        Time = as.character(Sys.time()),
+                        stringsAsFactors = F)
+mainReport <- rbind(mainReport, newReport)
+write.csv(mainReport, 'WDCM_MainReport.csv')
+### --- toLabsReport
+toLabsReport <- data.frame(timeStamp = as.character(Sys.time()),
+                           statbox = "stat1005",
+                           sqoopbox = "stat1004",
+                           stringsAsFactors = F)
+write.csv(toLabsReport, "toLabsReport.csv")
+print(paste("--- UPDATE RUN COMPLETED ON:", Sys.time(), sep = " "))
diff --git a/WDCM_Sqoop_Clients.R b/WDCM_Sqoop_Clients.R
index 000c4a7..9f33b92 100644
--- a/WDCM_Sqoop_Clients.R
+++ b/WDCM_Sqoop_Clients.R
@@ -1,3 +1,4 @@
+#!/usr/bin/env Rscript
 ### ---------------------------------------------------------------------------
 ### --- WDCM Search Module, v. Beta 0.1
@@ -42,9 +43,6 @@
 # along with WDCM. If not, see <http://www.gnu.org/licenses/>.
 ### ---------------------------------------------------------------------------
-### --- Setup
-rm(list = ls())
 ### --- Collect all client projects that maintain wbc_entitiy_usage
 # - show all databases
 mySqlArgs <- 
@@ -59,6 +57,9 @@
 # - select client projects
 wClients <- 
 clients <- clients$Database[wClients]
+# - remove test wikis:
+wTest <- grepl("^test", clients)
+if (length(wTest) > 0) {clients <- clients[-wTest]}
 # - look-up for wbc_entity_usage tables
 projectsTracking <- character()
 for (i in 1:length(clients)) {
@@ -72,7 +73,7 @@
   mySqlCommand <- paste0("mysql ", mySqlArgs, " -e ", mySqlInput, collapse = 
   system(command = mySqlCommand, wait = TRUE)
   tables <- read.table('clienttables.tsv', header = T, check.names = F, 
stringsAsFactors = F, sep = "\t")
-  if("wbc_entity_usage" %in% tables[, 1]) {
+  if ("wbc_entity_usage" %in% tables[, 1]) {
     projectsTracking <- append(projectsTracking, clients[i]) 
@@ -86,6 +87,12 @@
                               stringsAsFactors = F
 for (i in 1:length(projectsTracking)) {
+  # - drop wdcm_clients_wb_entity_usage if this is the first entry
+  if (i == 1) {
+    hiveCommand <- '"USE goransm; DROP TABLE IF EXISTS 
+    hiveCommand <- paste("beeline -e ", hiveCommand, sep = "")
+    system(command = hiveCommand, wait = TRUE)
+    }
   wdcmSqoopReport$project[i] <- projectsTracking[i]
   wdcmSqoopReport$startTime[i] <- as.character(Sys.time())
   # - sqoop command:

To view, visit https://gerrit.wikimedia.org/r/394737
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5cac7822b41c93d5ca60e5bee63e84f053f39758
Gerrit-PatchSet: 2
Gerrit-Project: analytics/wmde/WDCM
Gerrit-Branch: master
Gerrit-Owner: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: jenkins-bot <>

MediaWiki-commits mailing list

Reply via email to