Mobrovac has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/327378 )

Change subject: Temporarily switch to non-flowing mode.
......................................................................


Temporarily switch to non-flowing mode.

Bug: T153122
Change-Id: Ie81da4e5e4838be2c1697e4fc4d19dfd83995891
---
M lib/edit-stream.js
1 file changed, 29 insertions(+), 26 deletions(-)

Approvals:
  Mobrovac: Verified; Looks good to me, approved
  jenkins-bot: Verified



diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index 719d9d6..3a51f13 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -90,33 +90,36 @@
         this._consumer.connect();
         this._consumer
         .on('ready', () => {
-            this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
-                if (e) {
-                    return this.emit('error', e);
-                }
+            this._consumer.subscribe(this._consumeTopics);
+            const consume = () => {
+                this._consumer.consume((e, kafkaMessage) => {
+                    if (e) {
+                        if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF
+                                && e.code !== 
kafka.CODES.ERRORS.ERR__TIMED_OUT) {
+                            this.emit('error', e);
+                        }
+                        return consume();
+                    }
 
-                let msg;
-                try {
-                    msg = JSON.parse(kafkaMessage.value.toString());
-                } catch (e) {
-                    return this.emit('error', e);
-                }
-
-                const pureTopic = 
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
-                const skip = COMMIT_PUSH_SKIP[pureTopic];
-                if (!skip || kafkaMessage.offset % skip === 0) {
-                    this._commitQueue.push({
-                        message: kafkaMessage,
-                        timestamp: msg.meta.dt
-                    });
-                }
-
-                try {
-                    this.emit('edit', msg);
-                } catch (e) {
-                    this.emit('error', e);
-                }
-            });
+                    try {
+                        let msg = JSON.parse(kafkaMessage.value.toString());
+                        const pureTopic = 
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
+                        const skip = COMMIT_PUSH_SKIP[pureTopic];
+                        if (!skip || kafkaMessage.offset % skip === 0) {
+                            this._commitQueue.push({
+                                message: kafkaMessage,
+                                timestamp: msg.meta.dt
+                            });
+                        }
+                        this.emit('edit', msg);
+                    } catch (e) {
+                        this.emit('error', e);
+                    } finally {
+                        consume();
+                    }
+                });
+            };
+            consume();
         });
         this._commitQueue = [];
         this._commitInterval = setInterval(() => {

-- 
To view, visit https://gerrit.wikimedia.org/r/327378
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ie81da4e5e4838be2c1697e4fc4d19dfd83995891
Gerrit-PatchSet: 2
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko <[email protected]>
Gerrit-Reviewer: Jdlrobson <[email protected]>
Gerrit-Reviewer: Mobrovac <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to