Mobrovac has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/327212 )
Change subject: Use the message's original TS for commit calculation. ...................................................................... Use the message's original TS for commit calculation. Because the service needs to replay the events from the last hour on start-up, we hold off committing the offset to Kafka. However, we should do so an hour after the message has been produced, not an hour after we have read it. Bug: T153122 Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a --- M lib/edit-stream.js 1 file changed, 9 insertions(+), 2 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits refs/changes/12/327212/1 diff --git a/lib/edit-stream.js b/lib/edit-stream.js index be695ff..719d9d6 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -95,17 +95,24 @@ return this.emit('error', e); } + let msg; + try { + msg = JSON.parse(kafkaMessage.value.toString()); + } catch (e) { + return this.emit('error', e); + } + const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); const skip = COMMIT_PUSH_SKIP[pureTopic]; if (!skip || kafkaMessage.offset % skip === 0) { this._commitQueue.push({ message: kafkaMessage, - timestamp: Date.now() + timestamp: msg.meta.dt }); } try { - this.emit('edit', JSON.parse(kafkaMessage.value.toString())); + this.emit('edit', msg); } catch (e) { this.emit('error', e); } -- To view, visit https://gerrit.wikimedia.org/r/327212 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Mobrovac <mobro...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits