[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: More kafka tweaks
EBernhardson has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/385391 ) Change subject: More kafka tweaks .. More kafka tweaks Configure spark kafka consuming to be able to read large messages. Bug: T178274 Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f --- M mjolnir/kafka/client.py 1 file changed, 7 insertions(+), 1 deletion(-) Approvals: EBernhardson: Verified; Looks good to me, approved diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py index 60d4a37..8fdbcaf 100644 --- a/mjolnir/kafka/client.py +++ b/mjolnir/kafka/client.py @@ -188,7 +188,13 @@ offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, partition, start, end)) assert not isinstance(brokers, basestring) # TODO: how can we force the kafka api_version here? -kafka_params = {"metadata.broker.list": ','.join(brokers)} +kafka_params = { +'metadata.broker.list': ','.join(brokers), +# Set high fetch size values so we don't fail because of large messages +'max.partition.fetch.bytes': '4000', +'fetch.message.max.bytes': '4000' +} + # If this ends up being too much data from kafka, blowing up memory in the # spark executors, we could chunk the offsets and union together multiple RDD's. return ( -- To view, visit https://gerrit.wikimedia.org/r/385391 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f Gerrit-PatchSet: 4 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: DCausseGerrit-Reviewer: EBernhardson ___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: More kafka tweaks
DCausse has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/385391 ) Change subject: More kafka tweaks .. More kafka tweaks Configure spark kafka consuming to be able to read large messages. Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f --- M mjolnir/kafka/client.py 1 file changed, 7 insertions(+), 1 deletion(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/91/385391/1 diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py index 8fd3245..34dfd48 100644 --- a/mjolnir/kafka/client.py +++ b/mjolnir/kafka/client.py @@ -185,7 +185,13 @@ for partition, (start, end) in enumerate(zip(offsets_start, offsets_end)): offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, partition, start, end)) assert not isinstance(brokers, basestring) -kafka_params = {"metadata.broker.list": ','.join(brokers)} +kafka_params = { +'metadata.broker.list': ','.join(brokers), +# Set high fetch size values so we don't fail because of large messages +'max.partition.fetch.bytes': '4000', +'fetch.message.max.bytes': '4000' +} + # If this ends up being too much data from kafka, blowing up memory in the # spark executors, we could chunk the offsets and union together multiple RDD's. return ( -- To view, visit https://gerrit.wikimedia.org/r/385391 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I712e46ffc4add5b6ce86fd69aa230aa59896e40f Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: DCausse___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits