EBernhardson has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/385391 )
Change subject: More kafka tweaks ...................................................................... More kafka tweaks Configure spark kafka consuming to be able to read large messages. Bug: T178274 Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f --- M mjolnir/kafka/client.py 1 file changed, 7 insertions(+), 1 deletion(-) Approvals: EBernhardson: Verified; Looks good to me, approved diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py index 60d4a37..8fdbcaf 100644 --- a/mjolnir/kafka/client.py +++ b/mjolnir/kafka/client.py @@ -188,7 +188,13 @@ offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, partition, start, end)) assert not isinstance(brokers, basestring) # TODO: how can we force the kafka api_version here? - kafka_params = {"metadata.broker.list": ','.join(brokers)} + kafka_params = { + 'metadata.broker.list': ','.join(brokers), + # Set high fetch size values so we don't fail because of large messages + 'max.partition.fetch.bytes': '40000000', + 'fetch.message.max.bytes': '40000000' + } + # If this ends up being too much data from kafka, blowing up memory in the # spark executors, we could chunk the offsets and union together multiple RDD's. return ( -- To view, visit https://gerrit.wikimedia.org/r/385391 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f Gerrit-PatchSet: 4 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: DCausse <dcau...@wikimedia.org> Gerrit-Reviewer: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits