Mobrovac has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/327859 )
Change subject: Kafka consumer: Switch back to flowing mode. ...................................................................... Kafka consumer: Switch back to flowing mode. The driver bug has been fied and the new version has been published, so switch back to the flowing mode since it's more efficient. Bug: T153122 Bug: T145571 Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 --- M lib/edit-stream.js M package.json 2 files changed, 18 insertions(+), 28 deletions(-) Approvals: Mobrovac: Verified; Looks good to me, approved jenkins-bot: Verified diff --git a/lib/edit-stream.js b/lib/edit-stream.js index 0cc1603..ea5d0a5 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -90,35 +90,25 @@ this._consumer.connect(); this._consumer .on('ready', () => { - this._consumer.subscribe(this._consumeTopics); - const consume = () => { - this._consumer.consume((e, kafkaMessage) => { - if (e) { - if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF - && e.code !== kafka.CODES.ERRORS.ERR__TIMED_OUT) { - this.emit('error', e); - } - return consume(); - } + this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => { + if (e) { + return this.emit('error', e); + } - try { - let msg = JSON.parse(kafkaMessage.value.toString()); - const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; - if (!skip || kafkaMessage.offset % skip === 0) { - this._commitQueue.push({ - message: kafkaMessage, - timestamp: msg.meta.dt - }); - } - this.emit('edit', msg); - } catch (e) { - this.emit('error', e); - } finally { - consume(); + try { + const msg = JSON.parse(kafkaMessage.value.toString()); + const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; + if (!skip || kafkaMessage.offset % skip === 0) { + this._commitQueue.push({ + message: kafkaMessage, + timestamp: Date.parse(msg.meta.dt) + }); } - }); - }; - consume(); + this.emit('edit', msg); + } catch (e) { + this.emit('error', e); + } + }); }); this._commitQueue = []; this._commitInterval = setInterval(() => { diff --git a/package.json b/package.json index e12c7e9..cf25766 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "preq": "^0.4.10", "service-runner": "^2.1.10", "swagger-router": "^0.4.6", - "node-rdkafka": "^0.6.0", + "node-rdkafka": "^0.6.2", "wikipedia-edits-scorer": "^1.4.0" }, "devDependencies": { -- To view, visit https://gerrit.wikimedia.org/r/327859 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 Gerrit-PatchSet: 3 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Ppchelko <ppche...@wikimedia.org> Gerrit-Reviewer: Jdlrobson <jrob...@wikimedia.org> Gerrit-Reviewer: Mobrovac <mobro...@wikimedia.org> Gerrit-Reviewer: Ppchelko <ppche...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits