[MediaWiki-commits] [Gerrit] Rework transferToES.py to send to wikis sequentially - change (wikimedia...analytics)
jenkins-bot has submitted this change and it was merged. Change subject: Rework transferToES.py to send to wikis sequentially .. Rework transferToES.py to send to wikis sequentially The prior implementation of transferToES.py would send to whichever wiki it had enough items to send to. This works but pushes the elasticsearch cluster pretty hard, causing it to create segments in many different indices at the same time. This version reworks to use sort(project) which guarantees that each project will be wholly contained within a single partition. It then iterates that segment sending batches. This ensures that each worker is continuously sending documents to the same index and reduces the spread of segment creation, reducing load on the ES cluster. The one downside of this is that foreachPartition() brings the entire partition into memory, so the executors have to be sized such that any executor can hold the entirety of whichever wiki has the most scores to be sent. In the scheme of things this means using 15G of memory for the job, which is 1% of total hadoop memory. It seems small enough to be worthwhile. Change-Id: Id49b74f5ce56ac8c845fd23798500d0476946500 --- M oozie/transfer_to_es/bundle.properties M oozie/transfer_to_es/transferToES.py M oozie/transfer_to_es/workflow.xml 3 files changed, 18 insertions(+), 29 deletions(-) Approvals: Smalyshev: Looks good to me, approved jenkins-bot: Verified diff --git a/oozie/transfer_to_es/bundle.properties b/oozie/transfer_to_es/bundle.properties index 8c9286e..b949ed3 100644 --- a/oozie/transfer_to_es/bundle.properties +++ b/oozie/transfer_to_es/bundle.properties @@ -59,8 +59,9 @@ # of concurrency between the hadoop cluster and the elasticsearch cluster # during export. spark_number_executors= 3 -spark_executor_memory = 1G +spark_executor_memory = 5G spark_driver_memory = 1G +spark_sql_shuffle_partitions = 30 # Coordintator to start. oozie.bundle.application.path = ${discovery_oozie_directory}/transfer_to_es/bundle.xml diff --git a/oozie/transfer_to_es/transferToES.py b/oozie/transfer_to_es/transferToES.py index 84fe45d..d3740c7 100644 --- a/oozie/transfer_to_es/transferToES.py +++ b/oozie/transfer_to_es/transferToES.py @@ -119,37 +119,25 @@ if not sendDataToES(data, url): failedDocumentCounter.add(len(documents)) -def addToList(listNow, element): +def groupAndSend(rows): """ -Add element to list, will send the data out once batch is full +Group together documents from the same project and batch them +to elasticsearch """ -if len(listNow) < ITEMS_PER_BATCH: -return listNow + [element] -# Treshold reached, send the list out -sendDocumentsToES(listNow) -return [element] - -def mergeLists(listOne, listTwo): -""" -Merge two lists, will send the data out once batch is full -""" -newList = listOne + listTwo -if len(newList) < ITEMS_PER_BATCH: -return newList -sendDocumentsToES(newList) -return [] - -def sendCombined(data): -""" -Send data for the key (hostname) to ES -""" -sendDocumentsToES(data[1]) +group = [] +for row in rows: +if len(group) > 0 and group[0].project != row.project: +sendDocumentsToES(group) +group = [] +group.append(row) +if len(group) >= ITEMS_PER_BATCH: +sendDocumentsToES(group) +group = [] +if len(group) > 0: +sendDocumentsToES(group) data = sqlContext.load(SOURCE) # print "Count: %d\n" % data.count() -# Here's what is going on here: we combine the data by project, -# and when the list of projects reaches ITEMS_PER_BATCH we send them out -# The foreach() part will send out the ones that remained -data.map(lambda x: (x.project, x)).combineByKey(lambda x: [x], addToList, mergeLists).foreach(sendCombined) +data.sort(data.project).foreachPartition(groupAndSend) print "%d documents processed, %d failed." % (documentCounter.value, failedDocumentCounter.value,) print "%d requests successful, %d requests failed." % (updateCounter.value, errorCounter.value) diff --git a/oozie/transfer_to_es/workflow.xml b/oozie/transfer_to_es/workflow.xml index 1d005b8..6cedf5a 100644 --- a/oozie/transfer_to_es/workflow.xml +++ b/oozie/transfer_to_es/workflow.xml @@ -122,7 +122,7 @@ cluster Discovery Transfer To ${elasticsearch_url} ${pyspark_transfer_to_es_script} ---conf spark.yarn.jar=${spark_assembly_jar} --executor-memory ${spark_executor_memory} --driver-memory ${spark_driver_memory} --num-executors ${spark_number_executors} --queue ${queue_name} --conf spark.yarn.ap
[MediaWiki-commits] [Gerrit] Rework transferToES.py to send to wikis sequentially - change (wikimedia...analytics)
EBernhardson has uploaded a new change for review. https://gerrit.wikimedia.org/r/25 Change subject: Rework transferToES.py to send to wikis sequentially .. Rework transferToES.py to send to wikis sequentially The prior implementation of transferToES.py would send to whichever wiki it had enough items to send to. This works but pushes the elasticsearch cluster pretty hard, causing it to create segments in many different indices at the same time. This version reworks to use sort(project) which guarantees that each project will be wholly contained within a single partition. It then iterates that segment sending batches. This ensures that each worker is continuously sending documents to the same index and reduces the spread of segment creation, reducing load on the ES cluster. The one downside of this is that foreachPartition() brings the entire partition into memory, so the executors have to be sized such that any executor can hold the entirety of whichever wiki has the most scores to be sent. In the scheme of things this means using 15G of memory for the job, which is 1% of total hadoop memory. It seems small enough to be worthwhile. Change-Id: Id49b74f5ce56ac8c845fd23798500d0476946500 --- M oozie/transfer_to_es/bundle.properties M oozie/transfer_to_es/transferToES.py M oozie/transfer_to_es/workflow.xml 3 files changed, 19 insertions(+), 30 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/discovery/analytics refs/changes/65/25/1 diff --git a/oozie/transfer_to_es/bundle.properties b/oozie/transfer_to_es/bundle.properties index 8fe3dc9..b949ed3 100644 --- a/oozie/transfer_to_es/bundle.properties +++ b/oozie/transfer_to_es/bundle.properties @@ -58,9 +58,10 @@ # when spark alocates this resource. This partially controls the amount # of concurrency between the hadoop cluster and the elasticsearch cluster # during export. -spark_number_executors= 1 -spark_executor_memory = 1G +spark_number_executors= 3 +spark_executor_memory = 5G spark_driver_memory = 1G +spark_sql_shuffle_partitions = 30 # Coordintator to start. oozie.bundle.application.path = ${discovery_oozie_directory}/transfer_to_es/bundle.xml diff --git a/oozie/transfer_to_es/transferToES.py b/oozie/transfer_to_es/transferToES.py index 84fe45d..d3740c7 100644 --- a/oozie/transfer_to_es/transferToES.py +++ b/oozie/transfer_to_es/transferToES.py @@ -119,37 +119,25 @@ if not sendDataToES(data, url): failedDocumentCounter.add(len(documents)) -def addToList(listNow, element): +def groupAndSend(rows): """ -Add element to list, will send the data out once batch is full +Group together documents from the same project and batch them +to elasticsearch """ -if len(listNow) < ITEMS_PER_BATCH: -return listNow + [element] -# Treshold reached, send the list out -sendDocumentsToES(listNow) -return [element] - -def mergeLists(listOne, listTwo): -""" -Merge two lists, will send the data out once batch is full -""" -newList = listOne + listTwo -if len(newList) < ITEMS_PER_BATCH: -return newList -sendDocumentsToES(newList) -return [] - -def sendCombined(data): -""" -Send data for the key (hostname) to ES -""" -sendDocumentsToES(data[1]) +group = [] +for row in rows: +if len(group) > 0 and group[0].project != row.project: +sendDocumentsToES(group) +group = [] +group.append(row) +if len(group) >= ITEMS_PER_BATCH: +sendDocumentsToES(group) +group = [] +if len(group) > 0: +sendDocumentsToES(group) data = sqlContext.load(SOURCE) # print "Count: %d\n" % data.count() -# Here's what is going on here: we combine the data by project, -# and when the list of projects reaches ITEMS_PER_BATCH we send them out -# The foreach() part will send out the ones that remained -data.map(lambda x: (x.project, x)).combineByKey(lambda x: [x], addToList, mergeLists).foreach(sendCombined) +data.sort(data.project).foreachPartition(groupAndSend) print "%d documents processed, %d failed." % (documentCounter.value, failedDocumentCounter.value,) print "%d requests successful, %d requests failed." % (updateCounter.value, errorCounter.value) diff --git a/oozie/transfer_to_es/workflow.xml b/oozie/transfer_to_es/workflow.xml index 43e967f..5b00729 100644 --- a/oozie/transfer_to_es/workflow.xml +++ b/oozie/transfer_to_es/workflow.xml @@ -122,7 +122,7 @@ cluster Discovery Transfer To ${elasticsearch_url} ${pyspark_transfer_to_es_script} ---conf spark.yarn.jar=${spark_assembly_jar