GoranSMilovanovic has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/398666 )

Change subject: EngineGeo 16 Dec 2017
......................................................................


EngineGeo 16 Dec 2017

Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0
---
M WDCM_EngineGeo_goransm.R
1 file changed, 1 insertion(+), 900 deletions(-)

Approvals:
  GoranSMilovanovic: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/WDCM_EngineGeo_goransm.R b/WDCM_EngineGeo_goransm.R
index 7013c3a..25e8a40 100644
--- a/WDCM_EngineGeo_goransm.R
+++ b/WDCM_EngineGeo_goransm.R
@@ -209,908 +209,9 @@
 }
 
 ### ---------------------------------------------------------------------------
-### --- Script 2: WDCM_Search_Clients.R, WDCM Search Module
+### --- Step 2: ETL: Wikidata usage statistics from WDCM Maintable
 ### ---------------------------------------------------------------------------
 
-### ---------------------------------------------------------------------------
-### --- WDCM Search Module, v. Beta 0.1
-### --- Script: WDCM_Search_Clients.R, v. Beta 0.1
-### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
-### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
-### --- and WMDE.
-### --- Contact: goran.milovanovic_...@wikimedia.de
-### ---------------------------------------------------------------------------
-### --- DESCRIPTION:
-### --- WDCM_Search_Clients.R takes a list of
-### --- item IDs from Wikidata (the list is delivered by
-### --- the WDCM_Collect_Items.R script) and searches for their
-### --- usage across the Wikimedia projects in Hadoop:
-### --- database: goransm
-### --- directory: wdcmsqoop
-### --- table: wdcm_clients_wb_entity_usage
-### --- from production (currently: stat1005.eqiad.wmnet).
-### --- NOTE: wdcm_clients_wb_entity_usage is produced by
-### --- WDCM_Sqoop_Clients.R (currently run from: stat1004.eqiad.wmnet)
-### ---------------------------------------------------------------------------
-### --- INPUT:
-### --- the WDCM_Search_Clients_HiveQL.R reads the list of item IDs
-### --- to search for from /WDCM_CollectedItems
-### --- This folder contains the .csv files that specify the item IDs
-### --- to search for; the files are produced by Scrpt 1: WDCM_Collect_Items.R
-### ---------------------------------------------------------------------------
-### --- OUTPUT:
-### --- wdcm_maintable Hive table on hdfs, database: goransm
-### ---------------------------------------------------------------------------
-
-### --- read item categories:
-setwd(itemsDir)
-idFiles <- list.files()
-idFiles <- idFiles[grepl(".csv$", idFiles)]
-idFilesSize <- file.size(idFiles)/1024^2
-
-### --- Track all categories under processing:
-# - check for the existence of the wdcmSearchReport file
-# - and delete the old file if it's found:
-setwd(logDir)
-lF <- list.files()
-w <- which(grepl("^wdcmSearchReport", lF))
-if (length(w) == 1) {
-  file.remove(lF[w])
-}
-### --- generate wdcmSearchReport data.frame
-wdcmSearchReport <- data.frame(category = idFiles,
-                               fileSize = idFilesSize,
-                               startTime = character(length(idFiles)),
-                               endTime = character(length(idFiles)),
-                               stringsAsFactors = F
-)
-wdcmSearchReport <- wdcmSearchReport[order(-wdcmSearchReport$fileSize), ]
-
-### --- PREPARATION: delete goransm.wdcm_maintable if exists,
-### --- delete all from EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs 
path)
-
-### --- check if goransm.wdcm_maintable exists in Hadoop; if yes, drop it:
-# - NOTE: drop wdcm_maintable == erase metastore data:
-# - [query01Err]
-
-# - to runtime Log:
-print("Running query [query01Err].")
-
-query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-if (query01Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query01Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query01Err <- system(command = '/usr/local/bin/beeline --silent -e "USE 
goransm; DROP TABLE IF EXISTS wdcm_maintable;"', wait = T)
-  # - check errors:
-  if (query01Err != 0) {
-    # - to runtime Log:
-    print("--- (!!!) query01Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-### --- delete files for EXTERNAL Hive table from /user/goransm/wdcmtables 
(hdfs path)
-# - [query02Err]
-# - to runtime Log:
-print("--- Running query [query02Err].")
-query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', 
wait = T)
-if (query02Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query02Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query02Err <- system(command = 'hdfs dfs -rm -r /user/goransm/wdcmtables', 
wait = T)
-  # - check errors:
-  if (query02Err != 0) {
-    # - to runtime Log:
-    print("--- (!!!) query02Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-### --- make directory for EXTERNAL Hive table /user/goransm/wdcmtables (hdfs 
path)
-# - [query03Err]
-# - to runtime Log:
-print("--- Running query [query03Err].")
-query03Err <- system(command = 'hdfs dfs -mkdir /user/goransm/wdcmtables', 
wait = T)
-if (query03Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query03Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query03Err <- system(command = 'hdfs dfs -mkdir /user/goransm/wdcmtables', 
wait = T)
-  # - check errors:
-  if (query03Err != 0) {
-    # - to runtime Log:
-    print("--- (!!!) query03Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-#### --- loop over item categories:
-
-# - back to itemsDir for the loop:
-setwd(itemsDir)
-
-# - to runtime Log:
-print("--- LOOP: Producing wdcm_maintable from wdcm_clients_wb_entity_usage 
now.")
-
-for (i in 1:length(wdcmSearchReport$category)) {
-
-  # - start time for this category:
-  wdcmSearchReport$startTime[i] <- as.character(Sys.time())
-
-  ### --- read item IDs:
-  wFile <- which(grepl(wdcmSearchReport$category[i], idFiles, fixed = T))
-  qIDs <- read.csv(idFiles[wFile],
-                   header = T,
-                   check.names = F,
-                   stringsAsFactors = F)
-  qIDs <- qIDs$item
-  qIDs <- qIDs[grepl("^Q[[:digit:]]+", qIDs)]
-
-  ### --- cut into batches (if necessary)
-  # - cut into batches (5MB max. batch size)
-  batchNum <- ceiling(wdcmSearchReport$fileSize[i]/10)
-  batchSize <- round(length(qIDs)/batchNum)
-  startBatchIx <- c(1:batchNum) * batchSize - batchSize + 1
-  stopBatchIx <- c(1:batchNum) * batchSize
-  stopBatchIx[batchNum] <- length(qIDs)
-
-  for (batch in 1:batchNum) {
-
-    # - to runtime Log:
-    print(paste("------------- Processing category: ", i, ": ", 
wdcmSearchReport$category[i], sep = ""))
-    print(paste("------------- Processing batch: ", batch, " out of ", 
batchNum, sep = ""))
-    
-    # - create goransm.wdcm_maintable Hive table if this is the first entry:
-    # - (create wdcm_maintable Hive Table on (hdfs path): 
/user/goransm/wdcmtables)
-
-    if ((i == 1) & (batch == 1)) {
-
-      hiveCommand <- "\"USE goransm; CREATE EXTERNAL TABLE 
\\\`goransm.wdcm_maintable\\\`(
-      \\\`eu_entity_id\\\`        string      COMMENT '',
-      \\\`eu_project\\\`           string      COMMENT '',
-      \\\`eu_count\\\`          bigint      COMMENT ''
-      )
-      COMMENT
-      ''
-      PARTITIONED BY (
-      \\\`category\\\` string COMMENT 'The item category')
-      ROW FORMAT SERDE
-      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-      STORED AS INPUTFORMAT
-      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
-      OUTPUTFORMAT
-      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
-      LOCATION
-      'hdfs://analytics-hadoop/user/goransm/wdcmtables';\""
-      hiveCommand <- paste("/usr/local/bin/beeline --silent -e ", hiveCommand, 
sep = "")
-      # - [query04AErr]
-      # - to runtime Log:
-      print("--- Running query [query04AErr].")
-      query04AErr <- system(command = hiveCommand, wait = TRUE)
-      if (query04AErr != 0) {
-        # - to runtime Log:
-        print("--- (!!!) query04AErr failed: waiting for 1h before next 
attempt...")
-        # - sleep for one hour
-        Sys.sleep(time = 60*60)
-        # - re-run query
-        query04AErr <- system(command = hiveCommand, wait = TRUE)
-        # - check errors:
-        if (query04AErr != 0) {
-          # - to runtime Log:
-          print("--- (!!!) query04AErr failed AGAIN: quiting.")
-          quit()
-        }
-      }
-    }
-
-    # - construct HiveQL query to search for category i items
-    # - across all wiki_db:
-
-    # - to runtime Log:
-    print("--- RUNNING HiveQL Query to search for category items.")
-    hiveQLQuery_1 <- "USE goransm; SET hive.mapred.mode=unstrict;"
-    hiveQLQuery_2 <- paste("INSERT INTO TABLE wdcm_maintable
-                           PARTITION (category='",
-                           strsplit(wdcmSearchReport$category[i], split = "_", 
fixed = T)[[1]][1],
-                           "') ",
-                           "SELECT eu_entity_id, wiki_db AS eu_project, 
COUNT(*) AS eu_count FROM (
-                           SELECT DISTINCT eu_entity_id, eu_page_id, wiki_db 
FROM wdcm_clients_wb_entity_usage
-                           WHERE eu_entity_id IN (",
-                           paste("\"", 
qIDs[startBatchIx[batch]:stopBatchIx[batch]], "\"", collapse = ", ", sep = ""),
-                           ")) AS t
-                           GROUP BY eu_entity_id, wiki_db;",
-                           sep = "")
-    hiveQLQuery <- paste(hiveQLQuery_1, hiveQLQuery_2, sep = " ")
-    # - write hiveQLQuery locally:
-    setwd(fPath)
-    write(hiveQLQuery, "hiveQLQuery.hql")
-    # - execute HiveQLQuery:
-    hiveQLQueryCommand <- paste("/usr/local/bin/beeline --silent -f ", 
getwd(), "/hiveQLQuery.hql", sep = "")
-    # - [query04BErr]
-    # - to runtime Log:
-    print("--- Running query [query04BErr].")
-    
-    query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE)
-    if (query04BErr != 0) {
-      # - to runtime Log:
-      print("--- (!!!) query04BErr failed: waiting for 1h before next 
attempt...")
-      # - sleep for one hour
-      Sys.sleep(time = 60*60)
-      # - re-run query
-      query04BErr <- system(command = hiveQLQueryCommand, wait = TRUE)
-      # - check errors:
-      if (query04BErr != 0) {
-        # - to runtime Log:
-        print("--- (!!!) query04BErr failed AGAIN: quiting.")
-        quit()
-      }
-    }
-
-    # - to runtime Log:
-    print("--- REPAIR TABLE.")
-
-    # - repair partitions:
-    # - query04CErr
-    # - to runtime Log:
-    print("Running query [query04CErr].")
-    query04CErr <- system(command =
-                            '/usr/local/bin/beeline --silent -e "USE goransm; 
SET hive.mapred.mode = nonstrict; MSCK REPAIR TABLE wdcm_maintable;"',
-                          wait = TRUE)
-    if (query04CErr != 0) {
-      # - to runtime Log:
-      print("--- (!!!) query04CErr failed: waiting for 1h before next 
attempt...")
-      # - sleep for one hour
-      Sys.sleep(time = 60*60)
-      # - re-run query
-      query04CErr <- system(command =
-                              '/usr/local/bin/beeline --silent -e "USE 
goransm; SET hive.mapred.mode = nonstrict; MSCK REPAIR TABLE wdcm_maintable;"',
-                            wait = TRUE)
-      # - check errors:
-      if (query04CErr != 0) {
-        # - to runtime Log:
-        print("--- (!!!) query04CErr failed AGAIN: quiting.")
-        quit()
-      }
-    }
-
-    # - end time for this category:
-    wdcmSearchReport$endTime[i] <- as.character(Sys.time())
-
-    # - back to item categories:
-    setwd(itemsDir)
-
-  }
-
-}
-
-### --- store report:
-
-# - to runtime Log:
-print("--- LOG Search Phase completed.")
-
-# - to wdcmSearchReport:
-setwd(logDir)
-write.csv(wdcmSearchReport,
-          paste("wdcmSearchReport_",
-                strsplit(as.character(Sys.time()),
-                         split = " ")[[1]][1],
-                ".csv",
-                sep = ""))
-
-# - write to WDCM main reporting file:
-mainReport <- read.csv('WDCM_MainReport.csv',
-                       header = T,
-                       row.names = 1,
-                       check.names = F,
-                       stringsAsFactors = F)
-newReport <- data.frame(Step = 'SearchItems',
-                        Time = as.character(Sys.time()),
-                        stringsAsFactors = F)
-mainReport <- rbind(mainReport, newReport)
-write.csv(mainReport, 'WDCM_MainReport.csv')
-
-### ---------------------------------------------------------------------------
-### --- Script 3: WDCM_Pre-Process.R, WDCM Process Module
-### ---------------------------------------------------------------------------
-
-### ---------------------------------------------------------------------------
-### --- WDCM Process Module, v. Beta 0.1
-### --- Script: WDCM_Pre-Process.R, v. Beta 0.1
-### --- Author: Goran S. Milovanovic, Data Analyst, WMDE
-### --- Developed under the contract between Goran Milovanovic PR Data Kolektiv
-### --- and WMDE.
-### --- Contact: goran.milovanovic_...@wikimedia.de
-### ---------------------------------------------------------------------------
-### --- DESCRIPTION:
-### --- WDCM_Pre-Process.R works with the
-### --- wdcm_maintable Hive table on hdfs, database: goransm
-### --- to produce the .tsv files that migrate to
-### --- the wikidataconcepts.wmflabs.org Cloud VPS instance
-### --- from production (currently: stat1005.eqiad.wmnet).
-### ---------------------------------------------------------------------------
-### --- INPUT:
-### --- wdcm_maintable Hive table on hdfs, database: goransm
-### ---------------------------------------------------------------------------
-### --- OUTPUT:
-### --- Results are stored locally as .tsv files on production -
-### --- - on stat1005.eqiad.wmnet - in:
-### --- /WDCM_dataOut
-### --- These output .tsv files migrate to Labs:
-### --- wikidataconcepts.wmflabs.org Cloud VPS instance
-### --- where they are then processed by the WDCM Process Module.
-### ---------------------------------------------------------------------------
-
-# - to runtime Log:
-print("--- START: PRE-PROCESS")
-
-### --- functions
-
-# - projectType() to determine project type
-projectType <- function(projectName) {
-  unname(sapply(projectName, function(x) {
-    if (grepl("commons", x, fixed = T)) {"Commons"
-    } else if (grepl("mediawiki|meta|species|wikidata", x)) {"Other"
-    } else if (grepl("wiki$", x)) {"Wikipedia"
-    } else if (grepl("quote$", x)) {"Wikiquote"
-    } else if (grepl("voyage$", x)) {"Wikivoyage"
-    } else if (grepl("news$", x)) {"Wikinews"
-    } else if (grepl("source$", x)) {"Wikisource"
-    } else if (grepl("wiktionary$", x)) {"Wiktionary"
-    } else if (grepl("versity$", x)) {"Wikiversity"
-    } else if (grepl("books$", x)) {"Wikibooks"
-    } else {"Other"}
-  }))
-}
-
-### --- produce wdcm_item.tsv from wdcm_maintable (hdfs, database: goransm)
-### --- NOTE: one .tsv file per category (~14M rows, causes Java gc overflow 
from hive...)
-
-# - to runtime Log:
-print("--- STEP: produce wdcm_item.tsv from wdcm_maintable")
-
-### --- read item categories:
-setwd(itemsDir)
-
-idFiles <- list.files()
-idFiles <- idFiles[grepl(".csv$", idFiles)]
-categories <- unname(sapply(idFiles, function(x) {
-  strsplit(x, split = "_")[[1]][1]
-}))
-
-for (i in 1:length(categories)) {
-
-  filename <- paste("wdcm_item_",
-                    gsub(" ", "", categories[i], fixed = T), ".tsv",
-                    sep = "")
-
-  # - to runtime Log:
-  print("--- processing: ", filename, sep = "")
-  
-  hiveQLquery <- paste(
-    'USE goransm; SELECT eu_entity_id, SUM(eu_count) AS eu_count FROM 
wdcm_maintable WHERE category=\\"',
-    categories[i],
-    '\\" GROUP BY eu_entity_id ORDER BY eu_count DESC LIMIT 100000;',
-    sep = "")
-
-  # - query05Err
-  # - to runtime Log:
-  print("--- Running query [query05Err].")
-  query05Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', dataDir,
-                                       "/", filename,
-                                       sep = ""),
-                       wait = TRUE)
-  if (query05Err != 0) {
-    # - to runtime Log:
-    print("--- (!!!) query05Err failed: waiting for 1h before next attempt...")
-    # - sleep for one hour
-    Sys.sleep(time = 60*60)
-    # - re-run query
-    query05Err <- system(command = paste('/usr/local/bin/beeline --silent -e 
"',
-                                         hiveQLquery,
-                                         '" > ', dataDir,
-                                         "/", filename,
-                                         sep = ""),
-                         wait = TRUE)
-    # - check errors:
-    if (query05Err != 0) {
-      # - to runtime Log:
-      print("--- (!!!) query05Err failed AGAIN: quiting.")
-      quit()
-    }
-  }
-  
-}
-
-### --- ETL Phase
-
-# - to runtime Log:
-print("--- STEP: ETL PHASE")
-
-### --- to dataDir (and EVERYTHING ELSE goes to dataDir)
-setwd(dataDir)
-
-# - to runtime Log:
-print("--- STEP: produce wdcm_project_category.tsv")
-
-### --- produce wdcm_project_category.tsv from wdcm_maintable (hdfs, database: 
goransm)
-hiveQLquery <- 'USE goransm;
-                SET hive.mapred.mode=unstrict;
-                SELECT eu_project, category, SUM(eu_count) AS eu_count
-                FROM wdcm_maintable
-                GROUP BY eu_project, category ORDER BY eu_count DESC LIMIT 
10000000;'
-# - [query06Err]
-# - to runtime Log:
-print("--- Running query [query06Err].")
-
-query06Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                     hiveQLquery,
-                                     '" > ', getwd(), 
'/wdcm_project_category.tsv',
-                                     sep = ""),
-                     wait = TRUE)
-if (query06Err != 0) {
-  # - to runtime Log:
-  print("--- (!!!) query06Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query06Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), 
'/wdcm_project_category.tsv',
-                                       sep = ""),
-                       wait = TRUE)
-  # - check errors:
-  if (query06Err != 0) {
-    # - to runtime Log:
-    print("--- (!!!) query06Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-# - add projecttype to wdcm_project_category.tsv
-wdcm_project_category <- as.data.frame(fread('wdcm_project_category.tsv'))
-wdcm_project_category$projectype <- 
projectType(wdcm_project_category$eu_project)
-write.csv(wdcm_project_category, "wdcm_project_category.csv")
-
-# - to runtime Log:
-print("--- STEP: produce wdcm_project.tsv")
-
-### --- produce wdcm_project.tsv from wdcm_maintable (hdfs, database: goransm)
-hiveQLquery <- 'USE goransm;
-                SET hive.mapred.mode=unstrict;
-                SELECT eu_project, SUM(eu_count) AS eu_count
-                FROM wdcm_maintable
-                GROUP BY eu_project ORDER BY eu_count DESC LIMIT 10000000;'
-# - [query07Err]
-# - to runtime Log:
-print("Running query [query07Err].")
-query07Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                     hiveQLquery,
-                                     '" > ', getwd(), '/wdcm_project.tsv',
-                                     sep = ""),
-                     wait = TRUE)
-if (query07Err != 0) {
-  # - to runtime Log:
-  print("query07Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query07Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), '/wdcm_project.tsv',
-                                       sep = ""),
-                       wait = TRUE)
-  # - check errors:
-  if (query07Err != 0) {
-    print("query07Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-# - add projecttype to wdcm_project.tsv
-wdcm_project <- as.data.frame(fread('wdcm_project.tsv'))
-wdcm_project$projectype <- projectType(wdcm_project$eu_project)
-write.csv(wdcm_project, "wdcm_project.csv")
-
-# - to runtime Log:
-print("STEP: wdcm_category.tsv")
-
-### --- produce wdcm_category.tsv from wdcm_maintable (hdfs, database: goransm)
-hiveQLquery <- 'USE goransm;
-                SET hive.mapred.mode=unstrict;
-                SELECT category, SUM(eu_count) AS eu_count
-                FROM wdcm_maintable
-                GROUP BY category ORDER BY eu_count DESC LIMIT 10000000;'
-# - [query08Err]
-# - to runtime Log:
-print("Running query [query08Err].")
-query08Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                     hiveQLquery,
-                                     '" > ', getwd(), '/wdcm_category.tsv',
-                                     sep = ""),
-                     wait = TRUE)
-if (query08Err != 0) {
-  # - to runtime Log:
-  print("query08Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query08Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), '/wdcm_category.tsv',
-                                       sep = ""),
-                       wait = TRUE)
-  # - check errors:
-  if (query08Err != 0) {
-    # - to runtime Log:
-    print("query08Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-# - save wdcm_category.tsv as .csv
-wdcm_category <- as.data.frame(fread('wdcm_category.tsv'))
-write.csv(wdcm_category, "wdcm_category.csv")
-
-# - to runtime Log:
-print("STEP: produce wdcm_project_category_item100.tsv")
-
-### --- produce wdcm_project_category_item100.tsv from wdcm_maintable (hdfs, 
database: goransm)
-hiveQLquery <- 'USE goransm;
-                SET hive.mapred.mode=unstrict;
-                SELECT * FROM (
-                  SELECT eu_project, category, eu_entity_id, eu_count, 
ROW_NUMBER() OVER (PARTITION BY eu_project, category ORDER BY eu_count DESC) AS 
row
-                    FROM wdcm_maintable) t
-                WHERE row <= 100;'
-# - [query09Err]
-# - to runtime Log:
-print("Running query [query09Err].")
-query09Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                     hiveQLquery,
-                                     '" > ', getwd(), 
'/wdcm_project_category_item100.tsv',
-                                     sep = ""),
-                     wait = TRUE)
-if (query09Err != 0) {
-  # - to runtime Log:
-  print("query09Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query09Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), 
'/wdcm_project_category_item100.tsv',
-                                       sep = ""),
-                       wait = TRUE)
-  # - check errors:
-  if (query09Err != 0) {
-    # - to runtime Log:
-    print("query09Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-# - add projecttype to wdcm_project_category_item100.tsv
-wdcm_project_category_item100 <- 
as.data.frame(fread('wdcm_project_category_item100.tsv'))
-wdcm_project_category_item100$projectype <- 
projectType(wdcm_project_category_item100$t.eu_project)
-write.csv(wdcm_project_category_item100, "wdcm_project_category_item100.csv")
-
-# - to runtime Log:
-print("STEP: produce wdcm_project_item100.tsv")
-
-### --- produce wdcm_project_item100.tsv from wdcm_maintable (hdfs, database: 
goransm)
-hiveQLquery <- 'USE goransm;
-                SET hive.mapred.mode=unstrict;
-                SELECT * FROM (
-                  SELECT eu_project, eu_entity_id, eu_count, ROW_NUMBER() OVER 
(PARTITION BY eu_project ORDER BY eu_count DESC) AS row
-                FROM wdcm_maintable) t
-                WHERE row <= 100;'
-# - [query10Err]
-# - to runtime Log:
-print("Running query [query10Err].")
-query10Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                     hiveQLquery,
-                                     '" > ', getwd(), 
'/wdcm_project_item100.tsv',
-                                     sep = ""),
-                     wait = TRUE)
-if (query10Err != 0) {
-  # - to runtime Log:
-  print("query10Err failed: waiting for 1h before next attempt...")
-  # - sleep for one hour
-  Sys.sleep(time = 60*60)
-  # - re-run query
-  query10Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), 
'/wdcm_project_item100.tsv',
-                                       sep = ""),
-                       wait = TRUE)
-  # - check errors:
-  if (query10Err != 0) {
-    # - to runtime Log:
-    print("query10Err failed AGAIN: quiting.")
-    quit()
-  }
-}
-
-# - add projecttype to wdcm_project_item100.tsv
-wdcm_project_item100 <- as.data.frame(fread('wdcm_project_item100.tsv'))
-wdcm_project_item100$projectype <- 
projectType(wdcm_project_item100$t.eu_project)
-write.csv(wdcm_project_item100, "wdcm_project_item100.csv")
-
-
-### --- Semantic Modeling Phase
-### --- produce project-item matrices for semantic topic modeling
-
-# - to runtime Log:
-print("STEP: Semantic Modeling Phase: TDF MATRICES")
-itemFiles <- list.files()
-itemFiles <- itemFiles[grepl("^wdcm_item", itemFiles)]
-for (i in 1:length(itemFiles)) {
-  # - to runtime Log:
-  print(paste("----------------------- TDF matrix formation: category ", i, 
".", sep = ""))
-  # - load categoryFile[i].tsv as data.frame
-  categoryName <- strsplit(itemFiles[i], ".", fixed = T)[[1]][1]
-  categoryName <- strsplit(categoryName, "_", fixed = T)[[1]][3]
-  categoryName <- gsub("([[:lower:]])([[:upper:]])", "\\1 \\2", categoryName)
-  # - load items
-  # - NOTE: AN ARBITRARY DECISION TO MODEL TOP 5000 MOST FREQUENTLY USED ITEMS:
-  # - nrows = 5000
-  categoryFile <- fread(itemFiles[i], nrows = 5000)
-  # - list of items to fetch
-  itemList <- categoryFile$eu_entity_id
-  # - hiveQL:
-  hiveQLquery <- paste('USE goransm; SELECT eu_project, eu_entity_id, eu_count 
FROM wdcm_maintable WHERE eu_entity_id IN (',
-                       paste0("'", itemList, "'", collapse = ", ", sep = ""),
-                       ') AND category = \\"',
-                       categoryName,
-                       '\\";',
-                       sep = "")
-  fileName <- gsub(" ", "", categoryName, fixed = T)
-  fileName <- paste("tfMatrix_", fileName, ".tsv", sep = "")
-  # - [query11Err]
-  # - to runtime Log:
-  print("Running query [query11Err].")
-  query11Err <- system(command = paste('/usr/local/bin/beeline --silent -e "',
-                                       hiveQLquery,
-                                       '" > ', getwd(), '/',
-                                       fileName,
-                                       sep = ""),
-                       wait = TRUE)
-  if (query11Err != 0) {
-    # - to runtime Log:
-    print("query11Err failed: waiting for 1h before next attempt...")
-    # - sleep for one hour
-    Sys.sleep(time = 60*60)
-    # - re-run query
-    query11Err <- system(command = paste('/usr/local/bin/beeline --silent -e 
"',
-                                         hiveQLquery,
-                                         '" > ', getwd(), '/',
-                                         fileName,
-                                         sep = ""),
-                         wait = TRUE)
-    # - check errors:
-    if (query11Err != 0) {
-      # - to runtime Log:
-      print("query11Err failed AGAIN: quiting.")
-      quit()
-    }
-  }
-}
-
-### --- reshape project-item matrices for semantic topic modeling
-
-# - to runtime Log:
-print("STEP: Semantic Modeling Phase: RESHAPING TDF MATRICES")
-itemFiles <- list.files()
-itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)]
-itemFiles <- itemFiles[grepl(".tsv", itemFiles, fixed = T)]
-for (i in 1:length(itemFiles)) {
-  # - to runtime Log:
-  print(paste("----------------------- Reshaping TDF matrix: category ", i, 
".", sep = ""))
-  # - load categoryFile[i].tsv as data.frame
-  categoryFile <- fread(itemFiles[i])
-  categoryFile <- spread(categoryFile,
-                         key = eu_entity_id,
-                         value = eu_count,
-                         fill = 0)
-  rownames(categoryFile) <- categoryFile$eu_project
-  categoryFile$eu_project <- NULL
-  w <- which(colSums(categoryFile) == 0)
-  if (length(w) > 0) {
-    categoryFile <- categoryFile[, -w]
-  }
-  w <- which(rowSums(categoryFile) == 0)
-  if (length(w) > 0) {
-    categoryFile <- categoryFile[-w, ]
-  }
-  fileName <- paste(strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1], 
".csv", sep = "")
-  write.csv(categoryFile, fileName)
-}
-
-### --- semantic topic models for each category
-
-### --- to nohup.out
-# - to runtime Log:
-print("STEP: Semantic Modeling Phase: LDA estimation")
-itemFiles <- list.files()[grepl(".csv", x = list.files(), fixed = T)]
-itemFiles <- itemFiles[grepl("^tfMatrix_", itemFiles)]
-for (i in 1:length(itemFiles)) {
-
-  categoryName <- strsplit(itemFiles[i], split = ".", fixed = T)[[1]][1]
-  categoryName <- strsplit(categoryName, split = "_", fixed = T)[[1]][2]
-
-  # - topic modeling:
-  itemCat <- read.csv(itemFiles[i],
-                      header = T,
-                      check.names = F,
-                      row.names = 1,
-                      stringsAsFactors = F)
-  itemCat <- as.simple_triplet_matrix(itemCat)
-  
-  ## -- run on K = seq(2,20) semantic topics
-  
-  # - to runtime Log:
-  print(paste("----------------------- LDA model: category ", i, ".", sep = 
""))  
-  topicModel <- list()
-  numTopics <- seq(2, 10, by = 1)
-  for (k in 1:length(numTopics)) {
-    topicModel[[k]] <- maptpx::topics(counts = itemCat,
-                                      K = numTopics[k],
-                                      shape = NULL,
-                                      initopics = NULL,
-                                      tol = 0.1,
-                                      bf = T,
-                                      kill = 0,
-                                      ord = TRUE,
-                                      verb = 2)
-  }
-  # - clear:
-  rm(itemCat); gc()
-  # - determine model from Bayes Factor against Null:
-  wModel <- which.max(sapply(topicModel, function(x) {x$BF}))
-  topicModel <- topicModel[[wModel]]
-
-  # - collect matrices:
-  wdcm_itemtopic <- as.data.frame(topicModel$theta)
-  colnames(wdcm_itemtopic) <- paste("topic", seq(1, dim(wdcm_itemtopic)[2]), 
sep = "")
-  itemTopicFileName <- paste('wdcm2_itemtopic',
-                             paste(categoryName, ".csv", sep = ""),
-                             sep = "_")
-  write.csv(wdcm_itemtopic, itemTopicFileName)
-
-  wdcm_projecttopic <- as.data.frame(topicModel$omega)
-  colnames(wdcm_projecttopic) <- paste("topic", seq(1, 
dim(wdcm_projecttopic)[2]), sep = "")
-  wdcm_projecttopic$project <- rownames(wdcm_projecttopic)
-  wdcm_projecttopic$projecttype <- projectType(wdcm_projecttopic$project)
-  projectTopicFileName <- paste('wdcm2_projecttopic',
-                                paste(categoryName, ".csv", sep = ""),
-                                sep = "_")
-  write.csv(wdcm_projecttopic, projectTopicFileName)
-
-  # - clear:
-  rm(topicModel); rm(wdcm_projecttopic); rm(wdcm_itemtopic); gc()
-
-}
-
-### --- t-SNE 2D maps from wdcm2_projectttopic files: projects similarity 
structure
-# - to runtime Log:
-print("STEP: Semantic Modeling Phase: t-SNE 2D MAPS")
-projectFiles <- list.files()
-projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)]
-for (i in 1:length(projectFiles)) {
-  # filename:
-  fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1]
-  fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3]
-  fileName <- paste("wdcm2_tsne2D_project_", fileName, ".csv", sep = "")
-  # load:
-  projectTopics <- read.csv(projectFiles[i],
-                            header = T,
-                            check.names = F,
-                            row.names = 1,
-                            stringsAsFactors = F)
-  projectTopics$project <- NULL
-  projectTopics$projecttype <- NULL
-  # - Distance space, metric: Hellinger
-  projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = 
T))
-  # - t-SNE 2D map
-  tsneProject <- Rtsne(projectDist,
-                       theta = .5,
-                       is_distance = T,
-                       perplexity = 10)
-  # - store:
-  tsneProject <- as.data.frame(tsneProject$Y)
-  colnames(tsneProject) <- paste("D", seq(1:dim(tsneProject)[2]), sep = "")
-  tsneProject$project <- rownames(projectTopics)
-  tsneProject$projecttype <- projectType(tsneProject$project)
-  write.csv(tsneProject, fileName)
-  # - clear:
-  rm(projectTopics); rm(projectDist); rm(tsneProject)
-}
-
-### --- {visNetwork} graphs from wdcm2_projectttopic files: projects 
similarity structure
-
-# - to runtime Log:
-print("STEP: {visNetwork} graphs from wdcm2_projectttopic files")
-projectFiles <- list.files()
-projectFiles <- projectFiles[grepl("^wdcm2_projecttopic", projectFiles)]
-for (i in 1:length(projectFiles)) {
-  # - load:
-  projectTopics <- read.csv(projectFiles[i],
-                            header = T,
-                            check.names = F,
-                            row.names = 1,
-                            stringsAsFactors = F)
-  projectTopics$project <- NULL
-  projectTopics$projecttype <- NULL
-  # - Distance space, metric: Hellinger
-  projectDist <- as.matrix(dist(projectTopics, method = "Hellinger", by_rows = 
T))
-  # - {visNetwork} nodes data.frame:
-  indexMinDist <- sapply(rownames(projectDist), function(x) {
-    w <- which(rownames(projectDist) %in% x)
-    y <- sort(projectDist[w, -w], decreasing = T)
-    names(y)[length(y)]
-  })
-  id <- 1:length(colnames(projectDist))
-  label <- colnames(projectDist)
-  nodes <- data.frame(id = id,
-                      label = label,
-                      stringsAsFactors = F)
-  # - {visNetwork} edges data.frame:
-  edges <- data.frame(from = names(indexMinDist),
-                      to = unname(indexMinDist),
-                      stringsAsFactors = F)
-  edges$from <- sapply(edges$from, function(x) {
-    nodes$id[which(nodes$label %in% x)]
-  })
-  edges$to <- sapply(edges$to, function(x) {
-    nodes$id[which(nodes$label %in% x)]
-  })
-  edges$arrows <- rep("to", length(edges$to))
-  # filenames:
-  fileName <- strsplit(projectFiles[i], split = ".", fixed = T)[[1]][1]
-  fileName <- strsplit(fileName, split = "_", fixed = T)[[1]][3]
-  nodesFileName <- paste("wdcm2_visNetworkNodes_project_", fileName, ".csv", 
sep = "")
-  edgesFileName <- paste("wdcm2_visNetworkEdges_project_", fileName, ".csv", 
sep = "")
-  # store:
-  write.csv(nodes, nodesFileName)
-  write.csv(edges, edgesFileName)
-  # - clear:
-  rm(projectTopics); rm(projectDist); rm(nodes); rm(edges); gc()
-}
-
-# - write to WDCM main reporting file:
-setwd(logDir)
-mainReport <- read.csv('WDCM_MainReport.csv',
-                       header = T,
-                       row.names = 1,
-                       check.names = F,
-                       stringsAsFactors = F)
-newReport <- data.frame(Step = 'Pre-Process',
-                        Time = as.character(Sys.time()),
-                        stringsAsFactors = F)
-mainReport <- rbind(mainReport, newReport)
-write.csv(mainReport, 'WDCM_MainReport.csv')
-
-### --- toLabsReport
-toLabsReport <- data.frame(timeStamp = as.character(Sys.time()),
-                           statbox = "stat1005",
-                           sqoopbox = "stat1004",
-                           stringsAsFactors = F)
-write.csv(toLabsReport, "toLabsReport.csv")
 
-# - to runtime Log:
-print(paste("--- UPDATE RUN COMPLETED ON:", Sys.time(), sep = " "))
 
-### --- copy reports to /srv/published-datasets/wdcm:
 
-# - WDCM_MainReport
-system(command = 'cp 
/home/goransm/RScripts/WDCM_R/WDCM_Logs/WDCM_MainReport.csv 
/srv/published-datasets/wdcm/', wait = T)
-# - toLabsReport
-system(command = 'cp /home/goransm/RScripts/WDCM_R/WDCM_Logs/toLabsReport.csv 
/srv/published-datasets/wdcm/', wait = T)

-- 
To view, visit https://gerrit.wikimedia.org/r/398666
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I4750665c1789c201d579c425b9231a1f848be3a0
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wmde/WDCM
Gerrit-Branch: master
Gerrit-Owner: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: GoranSMilovanovic <goran.milovanovic_...@wikimedia.de>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to