Elukey has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/405687 )
Change subject: [WIP] kafka_confluent_writer: handle BufferError exception when
producing
......................................................................
[WIP] kafka_confluent_writer: handle BufferError exception when producing
Bug: T185291
Change-Id: I0056739fcd97e4520ce44640681c0a4c24247b3d
---
M eventlogging/handlers.py
1 file changed, 13 insertions(+), 2 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/eventlogging
refs/changes/87/405687/1
diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index 5e00fdd..2032cea 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -422,8 +422,19 @@
message_value = event.encode('utf-8') if raw else \
json.dumps(event, sort_keys=True).encode('utf-8')
- # Produce the message.
- kafka_producer.produce(message_topic, message_value, message_key)
+ event_delivered = False
+ while not event_delivered:
+ try:
+ # Produce the message.
+ kafka_producer.produce(message_topic, message_value,
message_key)
+ event_delivered = True
+ except BufferError as e:
+ logging.error(
+ 'Local queue full error when trying to produce, poll '
+ 'and wait for deliveries..' % e
+ )
+ kafka_producer.poll(1)
+ continue
# If not async, the flush Kafka produce buffer now and block
# until we are done.
--
To view, visit https://gerrit.wikimedia.org/r/405687
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0056739fcd97e4520ce44640681c0a4c24247b3d
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Elukey <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits