Jdlrobson has uploaded a new change for review. https://gerrit.wikimedia.org/r/323239
Change subject: WIP: Implement a configurable purge strategy ...................................................................... WIP: Implement a configurable purge strategy To avoid taking up too much memory by remembering the edit activity on too many pages, a purge strategy is added to remove pages that are not trending. A purge_period is used representing number of events processed. This executes a purge method which is passed a purge_strategy option The behaviour of this config is encapsulated in the tests. Bug: T145554 Change-Id: I1bbc82578711bf707580879d061d3d5d59b4e2e7 --- M app.js M config.dev.yaml M config.prod.yaml M lib/processor.js M test/features/lib/processor.js 5 files changed, 78 insertions(+), 4 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits refs/changes/39/323239/1 diff --git a/app.js b/app.js index a373440..9a79097 100644 --- a/app.js +++ b/app.js @@ -192,9 +192,15 @@ * @return {Application} */ function createEditStream(app) { + var eventsSincePurge = 0; const editStream = new EditStream(app.conf); editStream.on('edit', (message) => { + eventsSincePurge++; processor.process(message); + if (eventsSincePurge > app.conf.purge_period) { + eventsSincePurge = 0; + processor.purge(app.conf.purge_strategy); + } }); editStream.on('error', (e) => { app.logger.log('error/edit_stream', 'Error in edit stream: ' + e); diff --git a/config.dev.yaml b/config.dev.yaml index 2a3b700..74555d6 100644 --- a/config.dev.yaml +++ b/config.dev.yaml @@ -77,6 +77,11 @@ body: '{{request.body}}' # service-specific options consume_dc: [ test_dc, datacenter1 ] # Which datacenters to consume edit events from + purge_period: 100 # purge contents of store after this number of events + purge_strategy: + max_inactivity: 40 # maximum time in minutes a page can go without edits + max_age: 1440 # maximum age allowed in minutes + min_speed: 0 # minimum speed in edits per minute that a page is kept around trending_period: 60 # Calculate trending for one hour min_edits: 6 broker_list: localhost:9092 diff --git a/config.prod.yaml b/config.prod.yaml index 4e56ebc..22689a1 100644 --- a/config.prod.yaml +++ b/config.prod.yaml @@ -76,7 +76,12 @@ query: '{{ default(request.query, {}) }}' headers: '{{request.headers}}' body: '{{request.body}}' - consume_dc: [ eqiad, codfw ] # Which datacenters to consume edit events from + consume_dc: [ datacenter1, eqiad, codfw ] # Which datacenters to consume edit events from + purge_period: 100 # purge contents of store after this number of events + purge_strategy: + max_inactivity: 40 # maximum time in minutes a page can go without edits + max_age: 1440 # maximum age allowed in minutes + min_speed: 0.1 # minimum speed in edits per minute that a page is kept around trending_period: 3600 # Calculate trending for one hour min_edits: 6 broker_list: localhost:9092 diff --git a/lib/processor.js b/lib/processor.js index 90b3a5b..a1636d8 100644 --- a/lib/processor.js +++ b/lib/processor.js @@ -4,6 +4,27 @@ var pages = {}; /** + * Purge any pages that do not meet the required + * criteria + * + * @ignore + * @param {Object} options + */ +function purge(options) { + var now = new Date(options.date) || new Date(); + var time = now.getTime(); + var maxAgeDate = new Date(time - options.max_age * 60000); + var maxInactivityDate = new Date(time - options.max_inactivity * 60000); + Object.entries(pages).forEach((entry) => { + var page = entry[1]; + if (page.updated < maxInactivityDate || page.from < maxAgeDate) { + console.log('delete', page.id) + delete pages[page.id]; + } + }); +} + +/** * Reset all previous processing of edits */ function reset() { @@ -18,7 +39,7 @@ function process(edit) { var contributors; var id = edit.page_id; - var ts = edit.rev_timestamp; + var ts = new Date(edit.rev_timestamp); var performer = edit.performer; var isAnon = edit.performer.user_id === undefined; var username = performer.user_text; @@ -82,6 +103,7 @@ } module.exports = { + purge: purge, getPages: getPages, process: process, reset: reset diff --git a/test/features/lib/processor.js b/test/features/lib/processor.js index 6d777cf..4ab8868 100644 --- a/test/features/lib/processor.js +++ b/test/features/lib/processor.js @@ -74,8 +74,10 @@ assert.ok( pages.length === 1, 'Pages with same name are combined into single page' ); assert.ok( pages[0].edits === 3, 'Edits are counted for all non-bot edits' ); assert.ok( pages[0].title === 'Foo', 'Title is present' ); - assert.ok( pages[0].updated === '2016-11-15T18:03:55+00:00', 'Updated is timestamp of last edit' ); - assert.ok( pages[0].from === '2016-11-15T18:00:55+00:00', 'from is timestamp of first recorded edit' ); + assert.ok( pages[0].updated.toISOString() === '2016-11-15T18:03:55.000Z', + 'Updated is timestamp of last edit' ); + assert.ok( pages[0].from.toISOString() === '2016-11-15T18:00:55.000Z', + 'from is timestamp of first recorded edit' ); }); it('check processing of contributors', function() { @@ -161,3 +163,37 @@ assert.ok( pages.length === 0, 'Processor is an array with zero items before any processing.' ); }); }); + +describe('purging', function() { + + it('things get purged to avoid using too much memory', function() { + var pages; + processor.reset(); + // 0.1 edits per minute = 6 edits per hour = 8640 edits per day + // process 10 edits on 14th Nov @ 1pm [Will be purged as TOO OLD] + Array(10).join('/').split('/').forEach(() => processor.process( edit( 3, 'Jon', '2016-11-14T13:00:00+00:00' ) )); + // recent edit to ensure it doesn't get removed on grounds of inactivity + processor.process( edit( 3, 'Jon', '2016-11-15T12:59:00+00:00' ) ); + // process 30 edits for another page on 15th Nov @ 4.50pm [will be purged as INACTIVE] + Array(30).join('/').split('/').forEach(() => processor.process( edit( 6, 'Jon', '2016-11-15T15:00:00+00:00' ) )); + // this page although recently edited has less than 3 edits in last hour + Array(3).join('/').split('/').forEach(() => processor.process( edit( 7, 'Jon', '2016-11-15T15:00:00+00:00' ) )); + // process 10 edits for another page on 15th Nov @ 8pm + Array(10).join('/').split('/').forEach(() => processor.process( edit( 9, 'Jon', '2016-11-15T15:45:00+00:00' ) )); + + // purge on 15th Nov @ 4pm + processor.purge({ + date: '2016-11-15T13:00:00+00:00', + // purge anything older than a day to get rid of page with id 3. + max_age: 1440, + // purge anything inactive to get rid of page with id 6 + max_inactivity: 59, + // purge anything slower than 3 edits per second to get rid of page with id 7 + min_speed: 3 + }) + pages = processor.getPages(); + console.log('g', pages); + assert.ok( pages.length === 1, 'All but one page was purged' ); + assert.ok( pages[0].id === 9, 'And page with id 9 is only survivor' ); + }); +}); -- To view, visit https://gerrit.wikimedia.org/r/323239 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1bbc82578711bf707580879d061d3d5d59b4e2e7 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Jdlrobson <jrob...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits