[MediaWiki-commits] [Gerrit] Rework transferToES.py to send to wikis sequentially - change (wikimedia...analytics)

2016-01-29 Thread jenkins-bot (Code Review)
jenkins-bot has submitted this change and it was merged.

Change subject: Rework transferToES.py to send to wikis sequentially
..


Rework transferToES.py to send to wikis sequentially

The prior implementation of transferToES.py would send to whichever wiki
it had enough items to send to. This works but pushes the elasticsearch
cluster pretty hard, causing it to create segments in many different
indices at the same time.

This version reworks to use sort(project) which guarantees that each
project will be wholly contained within a single partition. It then
iterates that segment sending batches. This ensures that each worker
is continuously sending documents to the same index and reduces the
spread of segment creation, reducing load on the ES cluster.

The one downside of this is that foreachPartition() brings the entire
partition into memory, so the executors have to be sized such that any
executor can hold the entirety of whichever wiki has the most scores
to be sent. In the scheme of things this means using 15G of memory
for the job, which is 1% of total hadoop memory. It seems small enough
to be worthwhile.

Change-Id: Id49b74f5ce56ac8c845fd23798500d0476946500
---
M oozie/transfer_to_es/bundle.properties
M oozie/transfer_to_es/transferToES.py
M oozie/transfer_to_es/workflow.xml
3 files changed, 18 insertions(+), 29 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/oozie/transfer_to_es/bundle.properties 
b/oozie/transfer_to_es/bundle.properties
index 8c9286e..b949ed3 100644
--- a/oozie/transfer_to_es/bundle.properties
+++ b/oozie/transfer_to_es/bundle.properties
@@ -59,8 +59,9 @@
 # of concurrency between the hadoop cluster and the elasticsearch cluster
 # during export.
 spark_number_executors= 3
-spark_executor_memory = 1G
+spark_executor_memory = 5G
 spark_driver_memory   = 1G
+spark_sql_shuffle_partitions  = 30
 
 # Coordintator to start.
 oozie.bundle.application.path = 
${discovery_oozie_directory}/transfer_to_es/bundle.xml
diff --git a/oozie/transfer_to_es/transferToES.py 
b/oozie/transfer_to_es/transferToES.py
index 84fe45d..d3740c7 100644
--- a/oozie/transfer_to_es/transferToES.py
+++ b/oozie/transfer_to_es/transferToES.py
@@ -119,37 +119,25 @@
 if not sendDataToES(data, url):
 failedDocumentCounter.add(len(documents))
 
-def addToList(listNow, element):
+def groupAndSend(rows):
 """
-Add element to list, will send the data out once batch is full
+Group together documents from the same project and batch them
+to elasticsearch
 """
-if len(listNow) < ITEMS_PER_BATCH:
-return listNow + [element]
-# Treshold reached, send the list out
-sendDocumentsToES(listNow)
-return [element]
-
-def mergeLists(listOne, listTwo):
-"""
-Merge two lists, will send the data out once batch is full
-"""
-newList = listOne + listTwo
-if len(newList) < ITEMS_PER_BATCH:
-return newList
-sendDocumentsToES(newList)
-return []
-
-def sendCombined(data):
-"""
-Send data for the key (hostname) to ES
-"""
-sendDocumentsToES(data[1])
+group = []
+for row in rows:
+if len(group) > 0 and group[0].project != row.project:
+sendDocumentsToES(group)
+group = []
+group.append(row)
+if len(group) >= ITEMS_PER_BATCH:
+sendDocumentsToES(group)
+group = []
+if len(group) > 0:
+sendDocumentsToES(group)
 
 data = sqlContext.load(SOURCE)
 # print "Count: %d\n" % data.count()
-# Here's what is going on here: we combine the data by project,
-# and when the list of projects reaches ITEMS_PER_BATCH we send them out
-# The foreach() part will send out the ones that remained
-data.map(lambda x: (x.project, x)).combineByKey(lambda x: [x], addToList, 
mergeLists).foreach(sendCombined)
+data.sort(data.project).foreachPartition(groupAndSend)
 print "%d documents processed, %d failed." % (documentCounter.value, 
failedDocumentCounter.value,)
 print "%d requests successful, %d requests failed." % 
(updateCounter.value, errorCounter.value)
diff --git a/oozie/transfer_to_es/workflow.xml 
b/oozie/transfer_to_es/workflow.xml
index 1d005b8..6cedf5a 100644
--- a/oozie/transfer_to_es/workflow.xml
+++ b/oozie/transfer_to_es/workflow.xml
@@ -122,7 +122,7 @@
 cluster
 Discovery Transfer To ${elasticsearch_url}
 ${pyspark_transfer_to_es_script}
---conf spark.yarn.jar=${spark_assembly_jar} 
--executor-memory ${spark_executor_memory} --driver-memory 
${spark_driver_memory} --num-executors ${spark_number_executors} --queue 
${queue_name} --conf spark.yarn.ap

[MediaWiki-commits] [Gerrit] Rework transferToES.py to send to wikis sequentially - change (wikimedia...analytics)

2016-01-26 Thread EBernhardson (Code Review)
EBernhardson has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/25

Change subject: Rework transferToES.py to send to wikis sequentially
..

Rework transferToES.py to send to wikis sequentially

The prior implementation of transferToES.py would send to whichever wiki
it had enough items to send to. This works but pushes the elasticsearch
cluster pretty hard, causing it to create segments in many different
indices at the same time.

This version reworks to use sort(project) which guarantees that each
project will be wholly contained within a single partition. It then
iterates that segment sending batches. This ensures that each worker
is continuously sending documents to the same index and reduces the
spread of segment creation, reducing load on the ES cluster.

The one downside of this is that foreachPartition() brings the entire
partition into memory, so the executors have to be sized such that any
executor can hold the entirety of whichever wiki has the most scores
to be sent. In the scheme of things this means using 15G of memory
for the job, which is 1% of total hadoop memory. It seems small enough
to be worthwhile.

Change-Id: Id49b74f5ce56ac8c845fd23798500d0476946500
---
M oozie/transfer_to_es/bundle.properties
M oozie/transfer_to_es/transferToES.py
M oozie/transfer_to_es/workflow.xml
3 files changed, 19 insertions(+), 30 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/discovery/analytics 
refs/changes/65/25/1

diff --git a/oozie/transfer_to_es/bundle.properties 
b/oozie/transfer_to_es/bundle.properties
index 8fe3dc9..b949ed3 100644
--- a/oozie/transfer_to_es/bundle.properties
+++ b/oozie/transfer_to_es/bundle.properties
@@ -58,9 +58,10 @@
 # when spark alocates this resource. This partially controls the amount
 # of concurrency between the hadoop cluster and the elasticsearch cluster
 # during export.
-spark_number_executors= 1
-spark_executor_memory = 1G
+spark_number_executors= 3
+spark_executor_memory = 5G
 spark_driver_memory   = 1G
+spark_sql_shuffle_partitions  = 30
 
 # Coordintator to start.
 oozie.bundle.application.path = 
${discovery_oozie_directory}/transfer_to_es/bundle.xml
diff --git a/oozie/transfer_to_es/transferToES.py 
b/oozie/transfer_to_es/transferToES.py
index 84fe45d..d3740c7 100644
--- a/oozie/transfer_to_es/transferToES.py
+++ b/oozie/transfer_to_es/transferToES.py
@@ -119,37 +119,25 @@
 if not sendDataToES(data, url):
 failedDocumentCounter.add(len(documents))
 
-def addToList(listNow, element):
+def groupAndSend(rows):
 """
-Add element to list, will send the data out once batch is full
+Group together documents from the same project and batch them
+to elasticsearch
 """
-if len(listNow) < ITEMS_PER_BATCH:
-return listNow + [element]
-# Treshold reached, send the list out
-sendDocumentsToES(listNow)
-return [element]
-
-def mergeLists(listOne, listTwo):
-"""
-Merge two lists, will send the data out once batch is full
-"""
-newList = listOne + listTwo
-if len(newList) < ITEMS_PER_BATCH:
-return newList
-sendDocumentsToES(newList)
-return []
-
-def sendCombined(data):
-"""
-Send data for the key (hostname) to ES
-"""
-sendDocumentsToES(data[1])
+group = []
+for row in rows:
+if len(group) > 0 and group[0].project != row.project:
+sendDocumentsToES(group)
+group = []
+group.append(row)
+if len(group) >= ITEMS_PER_BATCH:
+sendDocumentsToES(group)
+group = []
+if len(group) > 0:
+sendDocumentsToES(group)
 
 data = sqlContext.load(SOURCE)
 # print "Count: %d\n" % data.count()
-# Here's what is going on here: we combine the data by project,
-# and when the list of projects reaches ITEMS_PER_BATCH we send them out
-# The foreach() part will send out the ones that remained
-data.map(lambda x: (x.project, x)).combineByKey(lambda x: [x], addToList, 
mergeLists).foreach(sendCombined)
+data.sort(data.project).foreachPartition(groupAndSend)
 print "%d documents processed, %d failed." % (documentCounter.value, 
failedDocumentCounter.value,)
 print "%d requests successful, %d requests failed." % 
(updateCounter.value, errorCounter.value)
diff --git a/oozie/transfer_to_es/workflow.xml 
b/oozie/transfer_to_es/workflow.xml
index 43e967f..5b00729 100644
--- a/oozie/transfer_to_es/workflow.xml
+++ b/oozie/transfer_to_es/workflow.xml
@@ -122,7 +122,7 @@
 cluster
 Discovery Transfer To ${elasticsearch_url}
 ${pyspark_transfer_to_es_script}
---conf spark.yarn.jar=${spark_assembly_jar