EBernhardson has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/385391 )

Change subject: More kafka tweaks
......................................................................


More kafka tweaks

Configure spark kafka consuming to be able to read large messages.

Bug: T178274
Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
---
M mjolnir/kafka/client.py
1 file changed, 7 insertions(+), 1 deletion(-)

Approvals:
  EBernhardson: Verified; Looks good to me, approved



diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py
index 60d4a37..8fdbcaf 100644
--- a/mjolnir/kafka/client.py
+++ b/mjolnir/kafka/client.py
@@ -188,7 +188,13 @@
         offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, 
partition, start, end))
     assert not isinstance(brokers, basestring)
     # TODO: how can we force the kafka api_version here?
-    kafka_params = {"metadata.broker.list": ','.join(brokers)}
+    kafka_params = {
+        'metadata.broker.list': ','.join(brokers),
+        # Set high fetch size values so we don't fail because of large messages
+        'max.partition.fetch.bytes': '40000000',
+        'fetch.message.max.bytes': '40000000'
+    }
+
     # If this ends up being too much data from kafka, blowing up memory in the
     # spark executors, we could chunk the offsets and union together multiple 
RDD's.
     return (

-- 
To view, visit https://gerrit.wikimedia.org/r/385391
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f
Gerrit-PatchSet: 4
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse <dcau...@wikimedia.org>
Gerrit-Reviewer: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to