[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Use the message's original TS for commit calculation
Mobrovac has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/327212 ) Change subject: Use the message's original TS for commit calculation .. Use the message's original TS for commit calculation Because the service needs to replay the events from the last hour on start-up, we hold off committing the offset to Kafka. However, we should do so an hour after the message has been produced, not an hour after we have read it. Bug: T153122 Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a --- M lib/edit-stream.js 1 file changed, 9 insertions(+), 2 deletions(-) Approvals: Mobrovac: Verified; Looks good to me, approved diff --git a/lib/edit-stream.js b/lib/edit-stream.js index be695ff..719d9d6 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -95,17 +95,24 @@ return this.emit('error', e); } +let msg; +try { +msg = JSON.parse(kafkaMessage.value.toString()); +} catch (e) { +return this.emit('error', e); +} + const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); const skip = COMMIT_PUSH_SKIP[pureTopic]; if (!skip || kafkaMessage.offset % skip === 0) { this._commitQueue.push({ message: kafkaMessage, -timestamp: Date.now() +timestamp: msg.meta.dt }); } try { -this.emit('edit', JSON.parse(kafkaMessage.value.toString())); +this.emit('edit', msg); } catch (e) { this.emit('error', e); } -- To view, visit https://gerrit.wikimedia.org/r/327212 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a Gerrit-PatchSet: 2 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Mobrovac Gerrit-Reviewer: Mobrovac Gerrit-Reviewer: jenkins-bot <> ___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Use the message's original TS for commit calculation.
Mobrovac has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/327212 ) Change subject: Use the message's original TS for commit calculation. .. Use the message's original TS for commit calculation. Because the service needs to replay the events from the last hour on start-up, we hold off committing the offset to Kafka. However, we should do so an hour after the message has been produced, not an hour after we have read it. Bug: T153122 Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a --- M lib/edit-stream.js 1 file changed, 9 insertions(+), 2 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits refs/changes/12/327212/1 diff --git a/lib/edit-stream.js b/lib/edit-stream.js index be695ff..719d9d6 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -95,17 +95,24 @@ return this.emit('error', e); } +let msg; +try { +msg = JSON.parse(kafkaMessage.value.toString()); +} catch (e) { +return this.emit('error', e); +} + const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); const skip = COMMIT_PUSH_SKIP[pureTopic]; if (!skip || kafkaMessage.offset % skip === 0) { this._commitQueue.push({ message: kafkaMessage, -timestamp: Date.now() +timestamp: msg.meta.dt }); } try { -this.emit('edit', JSON.parse(kafkaMessage.value.toString())); +this.emit('edit', msg); } catch (e) { this.emit('error', e); } -- To view, visit https://gerrit.wikimedia.org/r/327212 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie274e3fed9552ce6e4ab079e52543691cf02e58a Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Mobrovac ___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits