Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/174485

Change subject: WIP
......................................................................

WIP

Change-Id: I0f160ef25d05f6b218b97cb710ac3a411c0fd601
---
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/eventlogging/utils.py
3 files changed, 15 insertions(+), 9 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/85/174485/1

diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 6e4bac0..b9dc545 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -93,7 +93,7 @@
         interval=2, target=store_sql_events, args=(meta, events))
     worker.start()
 
-    while 1:
+    while worker.is_alive():
         event = (yield)
         events.append(event)
 
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index 4f25bee..dc97254 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -194,10 +194,15 @@
         insert.execute()
 
 
+def key_func(event):
+    """Sort / group function that batches events that share the same
+    SCID and set of fields."""
+    return (event['schema'], event['revision']), tuple(sorted(event)
+
 def store_sql_events(meta, events, replace=False):
     """Store events in the database."""
-    queue = [events.pop() for _ in range(len(events))]
-    queue.sort(key=get_scid)
+    queue = [flatten(events.pop()) for _ in range(len(events))]
+    queue.sort(key=key_func)
 
     if (getattr(meta.bind.dialect, 'supports_multivalues_insert', False)
             or getattr(meta.bind.dialect, 'supports_multirow_insert', False)):
@@ -205,7 +210,7 @@
     else:
         insert = _insert_sequential
 
-    for scid, events in itertools.groupby(queue, get_scid):
+    for (scid, _), events in itertools.groupby(queue, key_func):
         prepared_events = [prepare(event) for event in events]
         table = get_table(meta, scid)
         insert(table, prepared_events, replace)
@@ -225,7 +230,6 @@
 
 def prepare(event):
     """Prepare an event for insertion into the database."""
-    event = flatten(event)
     for prop in NO_DB_PROPERTIES:
         event.pop(prop, None)
     return event
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index 9c7e02c..cc530ed 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -7,6 +7,8 @@
   a particular function.
 
 """
+from __future__ import unicode_literals
+
 import threading
 
 
@@ -22,7 +24,7 @@
         super(PeriodicThread, self).__init__(*args, **kwargs)
 
     def run(self):
-        self.ready.clear()
-        self.ready.wait(self.interval)
-        self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
-        self.run()
+        while 1:
+            self.ready.clear()
+            self.ready.wait(self.interval)
+            self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

-- 
To view, visit https://gerrit.wikimedia.org/r/174485
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0f160ef25d05f6b218b97cb710ac3a411c0fd601
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to