[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: More kafka tweaks

2017-10-23 Thread EBernhardson (Code Review)
EBernhardson has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/385391 )

Change subject: More kafka tweaks
..


More kafka tweaks

Configure spark kafka consuming to be able to read large messages.

Bug: T178274
Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
---
M mjolnir/kafka/client.py
1 file changed, 7 insertions(+), 1 deletion(-)

Approvals:
  EBernhardson: Verified; Looks good to me, approved



diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py
index 60d4a37..8fdbcaf 100644
--- a/mjolnir/kafka/client.py
+++ b/mjolnir/kafka/client.py
@@ -188,7 +188,13 @@
 offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, 
partition, start, end))
 assert not isinstance(brokers, basestring)
 # TODO: how can we force the kafka api_version here?
-kafka_params = {"metadata.broker.list": ','.join(brokers)}
+kafka_params = {
+'metadata.broker.list': ','.join(brokers),
+# Set high fetch size values so we don't fail because of large messages
+'max.partition.fetch.bytes': '4000',
+'fetch.message.max.bytes': '4000'
+}
+
 # If this ends up being too much data from kafka, blowing up memory in the
 # spark executors, we could chunk the offsets and union together multiple 
RDD's.
 return (

-- 
To view, visit https://gerrit.wikimedia.org/r/385391
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
Gerrit-PatchSet: 4
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse 
Gerrit-Reviewer: EBernhardson 

___
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits


[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: More kafka tweaks

2017-10-20 Thread DCausse (Code Review)
DCausse has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/385391 )

Change subject: More kafka tweaks
..

More kafka tweaks

Configure spark kafka consuming to be able to read large messages.

Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
---
M mjolnir/kafka/client.py
1 file changed, 7 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/91/385391/1

diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py
index 8fd3245..34dfd48 100644
--- a/mjolnir/kafka/client.py
+++ b/mjolnir/kafka/client.py
@@ -185,7 +185,13 @@
 for partition, (start, end) in enumerate(zip(offsets_start, offsets_end)):
 offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, 
partition, start, end))
 assert not isinstance(brokers, basestring)
-kafka_params = {"metadata.broker.list": ','.join(brokers)}
+kafka_params = {
+'metadata.broker.list': ','.join(brokers),
+# Set high fetch size values so we don't fail because of large messages
+'max.partition.fetch.bytes': '4000',
+'fetch.message.max.bytes': '4000'
+}
+
 # If this ends up being too much data from kafka, blowing up memory in the
 # spark executors, we could chunk the offsets and union together multiple 
RDD's.
 return (

-- 
To view, visit https://gerrit.wikimedia.org/r/385391
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse 

___
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits