Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/181769

Change subject: [WIP] Add client API
......................................................................

[WIP] Add client API

Change-Id: I3ce7837b15855d077424382c15fbf765441f5f92
---
M server/eventlogging/jrm.py
M server/eventlogging/utils.py
M server/tests/test_jrm.py
3 files changed, 106 insertions(+), 27 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/69/181769/1

diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index a83953b..ef4d824 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -15,11 +15,12 @@
 
 import sqlalchemy
 
-from .schema import get_schema
 from .compat import items
+from .schema import get_schema
+from .utils import flatten
 
 
-__all__ = ('store_sql_events', 'flatten')
+__all__ = ('store_sql_events',)
 
 
 # Format string for :func:`datetime.datetime.strptime` for MediaWiki
@@ -242,21 +243,6 @@
     for prop in NO_DB_PROPERTIES:
         event.pop(prop, None)
     return event
-
-
-def flatten(d, sep='_', f=None):
-    """Collapse a nested dictionary. `f` specifies an optional mapping
-    function to apply to each (key, value) pair."""
-    flat = []
-    for k, v in items(d):
-        if f is not None:
-            (k, v) = f((k, v))
-        if isinstance(v, dict):
-            nested = items(flatten(v, sep, f))
-            flat.extend((k + sep + nk, nv) for nk, nv in nested)
-        else:
-            flat.append((k, v))
-    return dict(flat)
 
 
 def column_sort_key(column):
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index 35aef5d..124ee0c 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -9,15 +9,19 @@
 """
 from __future__ import unicode_literals
 
-import sys
-import re
-import threading
+import copy
 import logging
+import re
+import sys
+import threading
 
-from .compat import monotonic_clock
+from .compat import items, monotonic_clock
+from .factory import get_reader
 
 
-__all__ = ('PeriodicThread', 'uri_delete_query_item', 'setup_logging')
+__all__ = ('EventConsumer', 'PeriodicThread', 'flatten', 'is_subset_dict',
+           'setup_logging', 'unflatten', 'update_recursive',
+           'uri_delete_query_item')
 
 
 class PeriodicThread(threading.Thread):
@@ -56,13 +60,102 @@
 
 
 def uri_delete_query_item(uri, key):
-    """Delete a key=value pair (specified by key) from a URI's query string."""
+    """Delete a key-value pair (specified by key) from a URI's query string."""
     def repl(match):
         separator, trailing_ampersand = match.groups()
         return separator if trailing_ampersand else ''
     return re.sub('([?&])%s=[^&]*(&?)' % re.escape(key), repl, uri)
 
 
+def is_subset_dict(a, b):
+    """True if every key-value pair in `a` is also in `b`.
+    Values in `a` which are themselves dictionaries are tested
+    by recursively calling :func:`is_subset_dict`."""
+    for key, a_value in items(a):
+        try:
+            b_value = b[key]
+        except KeyError:
+            return False
+        if isinstance(a_value, dict) and isinstance(b_value, dict):
+            if not is_subset_dict(a_value, b_value):
+                return False
+        elif a_value != b_value:
+            return False
+    return True
+
+
+def update_recursive(d, other):
+    """Recursively update a dict with items from another dict."""
+    for key, val in items(other):
+        if isinstance(val, dict):
+            val = update_recursive(d.get(key, {}), val)
+        d[key] = val
+    return d
+
+
+def flatten(d, sep='_', f=None):
+    """Collapse a nested dictionary. `f` specifies an optional mapping
+    function to apply to each key, value pair. This function is the inverse
+    of :func:`unflatten`."""
+    flat = []
+    for k, v in items(d):
+        if f is not None:
+            (k, v) = f((k, v))
+        if isinstance(v, dict):
+            nested = items(flatten(v, sep, f))
+            flat.extend((k + sep + nk, nv) for nk, nv in nested)
+        else:
+            flat.append((k, v))
+    return dict(flat)
+
+
+def unflatten(d, sep='_', f=None):
+    """Expand a flattened dictionary. Keys containing `sep` are split into
+    nested key selectors. `f` specifies an optional mapping function to apply
+    to each key-value pair. This function is the inverse of :func:`flatten`."""
+    unflat = {}
+    for k, v in items(d):
+        if f is not None:
+            (k, v) = f((k, v))
+        while sep in k:
+            k, nested_k = k.split(sep, 1)
+            v = {nested_k: v}
+        if isinstance(v, dict):
+            v = unflatten(v, sep)
+        update_recursive(unflat, {k: v})
+    return unflat
+
+
+class EventConsumer(object):
+    """An EventLogging consumer API for standalone scripts.
+
+    .. code-block::
+
+       event_stream = eventlogging.EventConsumer('tcp://localhost:8600')
+       for event in event_stream.filter(schema='NavigationTiming'):
+           print(event)
+
+    """
+
+    def __init__(self, url):
+        self.url = url
+        self.conditions = {}
+
+    def filter(self, **conditions):
+        """Return a copy of this consumer that will filter events based
+        on conditions expressed as keyword arguments."""
+        update_recursive(conditions, self.conditions)
+        filtered = copy.copy(self)
+        filtered.conditions = conditions
+        return filtered
+
+    def __iter__(self):
+        """Iterate events matching the filter."""
+        for event in get_reader(self.url):
+            if is_subset_dict(self.conditions, event):
+                yield event
+
+
 def setup_logging():
     logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
                         format='%(asctime)s %(message)s')
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index 87d6b7b..ed0376c 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -41,9 +41,9 @@
         t = eventlogging.jrm.declare_table(self.meta, TEST_SCHEMA_SCID)
 
         # The columns we expect to see are..
-        cols = set(eventlogging.jrm.flatten(self.event))  # all properties
-        cols -= set(eventlogging.jrm.NO_DB_PROPERTIES)    # unless excluded
-        cols |= {'id', 'uuid'}                            # plus 'id' & 'uuid'
+        cols = set(eventlogging.utils.flatten(self.event))  # all properties
+        cols -= set(eventlogging.jrm.NO_DB_PROPERTIES)      # unless excluded
+        cols |= {'id', 'uuid'}                              # plus id & uuid
 
         self.assertSetEqual(set(t.columns.keys()), cols)
 
@@ -55,7 +55,7 @@
 
     def test_flatten(self):
         """``flatten`` correctly collapses deeply nested maps."""
-        flat = eventlogging.jrm.flatten(self.event)
+        flat = eventlogging.utils.flatten(self.event)
         self.assertEqual(flat['event_nested_deeplyNested_pi'], 3.14159)
 
     def test_encoding(self):

-- 
To view, visit https://gerrit.wikimedia.org/r/181769
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3ce7837b15855d077424382c15fbf765441f5f92
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to