Jdlrobson has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/323239

Change subject: WIP: Implement a configurable purge strategy
......................................................................

WIP: Implement a configurable purge strategy

To avoid taking up too much memory by remembering the edit activity
on too many pages, a purge strategy is added to remove pages that
are not trending.

A purge_period is used representing number of events processed.
This executes a purge method which is passed a purge_strategy option
The behaviour of this config is encapsulated in the tests.

Bug: T145554
Change-Id: I1bbc82578711bf707580879d061d3d5d59b4e2e7
---
M app.js
M config.dev.yaml
M config.prod.yaml
M lib/processor.js
M test/features/lib/processor.js
5 files changed, 78 insertions(+), 4 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits 
refs/changes/39/323239/1

diff --git a/app.js b/app.js
index a373440..9a79097 100644
--- a/app.js
+++ b/app.js
@@ -192,9 +192,15 @@
  * @return {Application}
  */
 function createEditStream(app) {
+    var eventsSincePurge = 0;
     const editStream = new EditStream(app.conf);
     editStream.on('edit', (message) => {
+        eventsSincePurge++;
         processor.process(message);
+        if (eventsSincePurge > app.conf.purge_period) {
+            eventsSincePurge = 0;
+            processor.purge(app.conf.purge_strategy);
+        }
     });
     editStream.on('error', (e) => {
         app.logger.log('error/edit_stream', 'Error in edit stream: ' + e);
diff --git a/config.dev.yaml b/config.dev.yaml
index 2a3b700..74555d6 100644
--- a/config.dev.yaml
+++ b/config.dev.yaml
@@ -77,6 +77,11 @@
         body: '{{request.body}}'
       # service-specific options
       consume_dc: [ test_dc, datacenter1 ] # Which datacenters to consume edit 
events from
+      purge_period: 100 # purge contents of store after this number of events
+      purge_strategy:
+        max_inactivity: 40 # maximum time in minutes a page can go without 
edits
+        max_age: 1440 # maximum age allowed in minutes
+        min_speed: 0 # minimum speed in edits per minute that a page is kept 
around
       trending_period: 60 # Calculate trending for one hour
       min_edits: 6
       broker_list: localhost:9092
diff --git a/config.prod.yaml b/config.prod.yaml
index 4e56ebc..22689a1 100644
--- a/config.prod.yaml
+++ b/config.prod.yaml
@@ -76,7 +76,12 @@
         query: '{{ default(request.query, {}) }}'
         headers: '{{request.headers}}'
         body: '{{request.body}}'
-      consume_dc: [ eqiad, codfw ] # Which datacenters to consume edit events 
from
+      consume_dc: [ datacenter1, eqiad, codfw ] # Which datacenters to consume 
edit events from
+      purge_period: 100 # purge contents of store after this number of events
+      purge_strategy:
+        max_inactivity: 40 # maximum time in minutes a page can go without 
edits
+        max_age: 1440 # maximum age allowed in minutes
+        min_speed: 0.1 # minimum speed in edits per minute that a page is kept 
around
       trending_period: 3600 # Calculate trending for one hour
       min_edits: 6
       broker_list: localhost:9092
diff --git a/lib/processor.js b/lib/processor.js
index 90b3a5b..a1636d8 100644
--- a/lib/processor.js
+++ b/lib/processor.js
@@ -4,6 +4,27 @@
 var pages = {};
 
 /**
+ * Purge any pages that do not meet the required
+ * criteria
+ *
+ * @ignore
+ * @param {Object} options
+ */
+function purge(options) {
+    var now = new Date(options.date) || new Date();
+    var time = now.getTime();
+    var maxAgeDate = new Date(time - options.max_age * 60000);
+    var maxInactivityDate = new Date(time - options.max_inactivity * 60000);
+    Object.entries(pages).forEach((entry) => {
+        var page = entry[1];
+        if (page.updated < maxInactivityDate || page.from < maxAgeDate) {
+            console.log('delete', page.id)
+            delete pages[page.id];
+        }
+    });
+}
+
+/**
  * Reset all previous processing of edits
  */
 function reset() {
@@ -18,7 +39,7 @@
 function process(edit) {
     var contributors;
     var id = edit.page_id;
-    var ts = edit.rev_timestamp;
+    var ts = new Date(edit.rev_timestamp);
     var performer = edit.performer;
     var isAnon = edit.performer.user_id === undefined;
     var username = performer.user_text;
@@ -82,6 +103,7 @@
 }
 
 module.exports = {
+    purge: purge,
     getPages: getPages,
     process: process,
     reset: reset
diff --git a/test/features/lib/processor.js b/test/features/lib/processor.js
index 6d777cf..4ab8868 100644
--- a/test/features/lib/processor.js
+++ b/test/features/lib/processor.js
@@ -74,8 +74,10 @@
         assert.ok( pages.length === 1, 'Pages with same name are combined into 
single page' );
         assert.ok( pages[0].edits === 3, 'Edits are counted for all non-bot 
edits' );
         assert.ok( pages[0].title === 'Foo', 'Title is present' );
-        assert.ok( pages[0].updated === '2016-11-15T18:03:55+00:00', 'Updated 
is timestamp of last edit' );
-        assert.ok( pages[0].from === '2016-11-15T18:00:55+00:00', 'from is 
timestamp of first recorded edit' );
+        assert.ok( pages[0].updated.toISOString() === 
'2016-11-15T18:03:55.000Z',
+            'Updated is timestamp of last edit' );
+        assert.ok( pages[0].from.toISOString() === '2016-11-15T18:00:55.000Z',
+            'from is timestamp of first recorded edit' );
     });
 
     it('check processing of contributors', function() {
@@ -161,3 +163,37 @@
         assert.ok( pages.length === 0, 'Processor is an array with zero items 
before any processing.' );
     });
 });
+
+describe('purging', function() {
+
+    it('things get purged to avoid using too much memory', function() {
+        var pages;
+        processor.reset();
+        // 0.1 edits per minute = 6 edits per hour = 8640 edits per day
+        // process 10 edits on 14th Nov @ 1pm [Will be purged as TOO OLD]
+        Array(10).join('/').split('/').forEach(() => processor.process( edit( 
3, 'Jon', '2016-11-14T13:00:00+00:00' ) ));
+        // recent edit to ensure it doesn't get removed on grounds of 
inactivity
+        processor.process( edit( 3, 'Jon', '2016-11-15T12:59:00+00:00' ) );
+        // process 30 edits for another page on 15th Nov @ 4.50pm [will be 
purged as INACTIVE]
+        Array(30).join('/').split('/').forEach(() => processor.process( edit( 
6, 'Jon', '2016-11-15T15:00:00+00:00' ) ));
+        // this page although recently edited has less than 3 edits in last 
hour
+        Array(3).join('/').split('/').forEach(() => processor.process( edit( 
7, 'Jon', '2016-11-15T15:00:00+00:00' ) ));
+        // process 10 edits for another page on 15th Nov @ 8pm
+        Array(10).join('/').split('/').forEach(() => processor.process( edit( 
9, 'Jon', '2016-11-15T15:45:00+00:00' ) ));
+
+        // purge on 15th Nov @ 4pm
+        processor.purge({
+            date: '2016-11-15T13:00:00+00:00',
+            // purge anything older than a day to get rid of page with id 3.
+            max_age: 1440,
+            // purge anything inactive to get rid of page with id 6
+            max_inactivity: 59,
+            // purge anything slower than 3 edits per second to get rid of 
page with id 7
+            min_speed: 3
+        })
+        pages = processor.getPages();
+        console.log('g', pages);
+        assert.ok( pages.length === 1, 'All but one page was purged' );
+        assert.ok( pages[0].id === 9, 'And page with id 9 is only survivor' );
+    });
+});

-- 
To view, visit https://gerrit.wikimedia.org/r/323239
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1bbc82578711bf707580879d061d3d5d59b4e2e7
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Jdlrobson <jrob...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to