Mobrovac has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/327859 )

Change subject: Kafka consumer: Switch back to flowing mode.
......................................................................


Kafka consumer: Switch back to flowing mode.

The driver bug has been fied and the new version
has been published, so switch back to the flowing
mode since it's more efficient.

Bug: T153122
Bug: T145571
Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
---
M lib/edit-stream.js
M package.json
2 files changed, 18 insertions(+), 28 deletions(-)

Approvals:
  Mobrovac: Verified; Looks good to me, approved
  jenkins-bot: Verified



diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index 0cc1603..ea5d0a5 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -90,35 +90,25 @@
         this._consumer.connect();
         this._consumer
         .on('ready', () => {
-            this._consumer.subscribe(this._consumeTopics);
-            const consume = () => {
-                this._consumer.consume((e, kafkaMessage) => {
-                    if (e) {
-                        if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF
-                                && e.code !== 
kafka.CODES.ERRORS.ERR__TIMED_OUT) {
-                            this.emit('error', e);
-                        }
-                        return consume();
-                    }
+            this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
+                if (e) {
+                    return this.emit('error', e);
+                }
 
-                    try {
-                        let msg = JSON.parse(kafkaMessage.value.toString());
-                        const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
-                        if (!skip || kafkaMessage.offset % skip === 0) {
-                            this._commitQueue.push({
-                                message: kafkaMessage,
-                                timestamp: msg.meta.dt
-                            });
-                        }
-                        this.emit('edit', msg);
-                    } catch (e) {
-                        this.emit('error', e);
-                    } finally {
-                        consume();
+                try {
+                    const msg = JSON.parse(kafkaMessage.value.toString());
+                    const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
+                    if (!skip || kafkaMessage.offset % skip === 0) {
+                        this._commitQueue.push({
+                            message: kafkaMessage,
+                            timestamp: Date.parse(msg.meta.dt)
+                        });
                     }
-                });
-            };
-            consume();
+                    this.emit('edit', msg);
+                } catch (e) {
+                    this.emit('error', e);
+                }
+            });
         });
         this._commitQueue = [];
         this._commitInterval = setInterval(() => {
diff --git a/package.json b/package.json
index e12c7e9..cf25766 100644
--- a/package.json
+++ b/package.json
@@ -33,7 +33,7 @@
     "preq": "^0.4.10",
     "service-runner": "^2.1.10",
     "swagger-router": "^0.4.6",
-    "node-rdkafka": "^0.6.0",
+    "node-rdkafka": "^0.6.2",
     "wikipedia-edits-scorer": "^1.4.0"
   },
   "devDependencies": {

-- 
To view, visit https://gerrit.wikimedia.org/r/327859
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko <ppche...@wikimedia.org>
Gerrit-Reviewer: Jdlrobson <jrob...@wikimedia.org>
Gerrit-Reviewer: Mobrovac <mobro...@wikimedia.org>
Gerrit-Reviewer: Ppchelko <ppche...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to