Mobrovac has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/327378 )
Change subject: Temporarily switch to non-flowing mode.
......................................................................
Temporarily switch to non-flowing mode.
Bug: T153122
Change-Id: Ie81da4e5e4838be2c1697e4fc4d19dfd83995891
---
M lib/edit-stream.js
1 file changed, 29 insertions(+), 26 deletions(-)
Approvals:
Mobrovac: Verified; Looks good to me, approved
jenkins-bot: Verified
diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index 719d9d6..3a51f13 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -90,33 +90,36 @@
this._consumer.connect();
this._consumer
.on('ready', () => {
- this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
- if (e) {
- return this.emit('error', e);
- }
+ this._consumer.subscribe(this._consumeTopics);
+ const consume = () => {
+ this._consumer.consume((e, kafkaMessage) => {
+ if (e) {
+ if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF
+ && e.code !==
kafka.CODES.ERRORS.ERR__TIMED_OUT) {
+ this.emit('error', e);
+ }
+ return consume();
+ }
- let msg;
- try {
- msg = JSON.parse(kafkaMessage.value.toString());
- } catch (e) {
- return this.emit('error', e);
- }
-
- const pureTopic =
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
- const skip = COMMIT_PUSH_SKIP[pureTopic];
- if (!skip || kafkaMessage.offset % skip === 0) {
- this._commitQueue.push({
- message: kafkaMessage,
- timestamp: msg.meta.dt
- });
- }
-
- try {
- this.emit('edit', msg);
- } catch (e) {
- this.emit('error', e);
- }
- });
+ try {
+ let msg = JSON.parse(kafkaMessage.value.toString());
+ const pureTopic =
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
+ const skip = COMMIT_PUSH_SKIP[pureTopic];
+ if (!skip || kafkaMessage.offset % skip === 0) {
+ this._commitQueue.push({
+ message: kafkaMessage,
+ timestamp: msg.meta.dt
+ });
+ }
+ this.emit('edit', msg);
+ } catch (e) {
+ this.emit('error', e);
+ } finally {
+ consume();
+ }
+ });
+ };
+ consume();
});
this._commitQueue = [];
this._commitInterval = setInterval(() => {
--
To view, visit https://gerrit.wikimedia.org/r/327378
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ie81da4e5e4838be2c1697e4fc4d19dfd83995891
Gerrit-PatchSet: 2
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko <[email protected]>
Gerrit-Reviewer: Jdlrobson <[email protected]>
Gerrit-Reviewer: Mobrovac <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits