Nuria has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/169977

Change subject: [WIP] Batching event insertion
......................................................................

[WIP] Batching event insertion

The eventlogging database consumer generates single-row inserts.
From a performance perspective (master overhead and slave replication lag)
it would be better to batch inserts. Batches do not
need to be large or of a particular size.

Passing existing tests but tests missing tests for new functionality

Also missing time-based flushing of events, WIP.

Bug: 67450
Change-Id: Ie642010fabc01d4f917fab229ad8aa44af8608c0
---
M server/README.rst
M server/eventlogging/__init__.py
A server/eventlogging/config.py
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/test-requirements.txt
M server/tests/fixtures.py
M server/tests/test_jrm.py
M server/tox.ini
9 files changed, 124 insertions(+), 23 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/77/169977/1

diff --git a/server/README.rst b/server/README.rst
index d9836d2..df5e87c 100644
--- a/server/README.rst
+++ b/server/README.rst
@@ -8,7 +8,7 @@
 To install dependencies in Ubuntu / Debian, simply run::
 
     $ sudo apt-get install -y python-coverage python-mysqldb python-nose \
-         python-pip python-sqlalchemy python-zmq python-pymongo
+         python-pip python-sqlalchemy python-zmq python-pymongo python-mock
 
 .. _EventLogging: http://www.mediawiki.org/wiki/Extension:EventLogging
 
diff --git a/server/eventlogging/__init__.py b/server/eventlogging/__init__.py
index f80ae06..2c705fb 100644
--- a/server/eventlogging/__init__.py
+++ b/server/eventlogging/__init__.py
@@ -24,6 +24,7 @@
 from .schema import *
 from .streams import *
 from .crypto import *
+from .config import *
 
 # The fact that schema validation is entrusted to a third-party module
 # is an implementation detail that a consumer of this package's API
diff --git a/server/eventlogging/config.py b/server/eventlogging/config.py
new file mode 100644
index 0000000..11862d5
--- /dev/null
+++ b/server/eventlogging/config.py
@@ -0,0 +1,47 @@
+""""
+Defines application wide configuration
+We want to avoid the usage of globals to allow for easy testing
+of config settings.
+
+The class just provides a wrapper arround globals
+to be able to override configuration easily.
+"""
+
+
+class GlobalConfig(object):
+
+    __singleton_instance = None
+
+    @classmethod
+    def instance(cls):
+        """"
+        Way to properly instantiate this class is
+        GlobalConfig.instance()
+        """
+        if not cls.__singleton_instance:
+            cls.__singleton_instance = cls()
+        return cls.__singleton_instance
+
+    def __init__(self):
+        """"
+        Global config settings are defined here
+        For now we have populated the configuration
+        with the settings we need in the jrm class
+        """
+        self._config = {}
+
+        """
+        Looking at throughput in vanadium the schema, that, at this time
+        sents most events per second is sending about 53 (~3000 per min).
+        Schemas that send little volume are sending about 0.02 events
+        per second or less (~1 per min).
+
+        With the settings below we will be flushing events every 2 minutes
+        or when the buffer hits 500 records.
+        """
+        self._config['MAX_BUFFER_AGE'] = 5 * 60
+
+        self._config['MAX_NUMBER_OF_RECORDS_IN_BUFFER'] = 500
+
+    def get(self, item, default=None):
+        return self._config.get(item, default)
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index cfe67bc..3f83a0d 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -85,9 +85,10 @@
 def sql_writer(uri):
     engine = sqlalchemy.create_engine(uri)
     meta = sqlalchemy.MetaData(bind=engine)
-
+    from config import GlobalConfig
     while 1:
-        store_sql_event(meta, (yield))
+        config = GlobalConfig.instance()
+        store_sql_event(meta, engine, config, (yield))
 
 
 @writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index c493685..3db1338 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -99,6 +99,12 @@
     })
 ))
 
+# buffering holder, keyed by table_name
+buffer = {}
+
+# min heap keyed by (timestamp, table_name)
+h = []
+
 
 def typecast(property):
     """Generates a SQL column definition from a JSON Schema property
@@ -164,21 +170,54 @@
     return table
 
 
-def store_sql_event(meta, event, ignore_dupes=False):
-    """Store an event in the database."""
+def store_sql_event(meta, engine, config, event, ignore_dupes=False):
+    """
+    Store an event in the database.
+
+    Events are buffered and flushed from the buffer according
+    to the following criteria:
+
+    When an event comes in if a buffer for that table exists is appended
+    to the buffer. Should the buffer be older
+    than MAX_BUFFER_AGE or have more than
+    MAX_NUMBER_OF_RECORDS_IN_BUFFER items
+    the buffer is flushed after the record is appended.
+    Should the table not exist it will be created at flushing time.
+
+    If the buffer for that event does not exists we start a buffer.
+
+    """
     scid = (event['schema'], event['revision'])
-    table = get_table(meta, scid)
+    # construct table name
+    table_name = TABLE_NAME_FORMAT % scid
+
     event = flatten(event)
     event = {k: v for k, v in items(event) if k not in NO_DB_PROPERTIES}
-    insert = table.insert(values=event)
-    try:
-        insert.execute()
-    except sqlalchemy.exc.IntegrityError as e:
-        if not ignore_dupes or 'unique' not in str(e).lower():
-            raise
-    except sqlalchemy.exc.ProgrammingError:
-        table.create()
-        insert.execute()
+
+    # do we need to start buffering for this table?
+    if buffer.get(table_name) is None:
+        buffer[table_name] = []
+
+    # append to buffer
+    buffer[table_name].append(event)
+
+    # do we need to flush this schema?
+    number_of_events = len(buffer[table_name])
+    if number_of_events >= config.get('MAX_NUMBER_OF_RECORDS_IN_BUFFER'):
+        # this would create table if not exists
+        table = get_table(meta, scid)
+        try:
+            events = buffer[table_name]
+            engine.execute(table.insert(), events)
+        except sqlalchemy.exc.IntegrityError as e:
+            if not ignore_dupes or 'unique' not in str(e).lower():
+                raise
+        except sqlalchemy.exc.ProgrammingError:
+            table.create()
+            engine.execute(table.insert(), events)
+        finally:
+            # make sure to empty buffer in memory
+            buffer[table_name] = []
 
 
 def _property_getter(item):
diff --git a/server/test-requirements.txt b/server/test-requirements.txt
index 33f4945..d8c14b6 100644
--- a/server/test-requirements.txt
+++ b/server/test-requirements.txt
@@ -1,2 +1,3 @@
 coverage
 nose
+mock
diff --git a/server/tests/fixtures.py b/server/tests/fixtures.py
index 01ef674..1885200 100644
--- a/server/tests/fixtures.py
+++ b/server/tests/fixtures.py
@@ -14,6 +14,7 @@
 import eventlogging
 import eventlogging.factory
 import sqlalchemy
+from mock import MagicMock
 
 
 TEST_SCHEMA_SCID = ('TestSchema', 123)
@@ -191,6 +192,11 @@
         super(DatabaseTestMixin, self).setUp()
         self.engine = sqlalchemy.create_engine('sqlite://', echo=True)
         self.meta = sqlalchemy.MetaData(bind=self.engine)
+        # mocking to set buffer size to '1'
+        from eventlogging.config import GlobalConfig
+        config = GlobalConfig.instance()
+        config.get = MagicMock(return_value=1)
+        self.config = config
 
     def tearDown(self):
         """Dispose of the database access objects."""
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index 166a621..2d32bde 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -24,7 +24,8 @@
         """If an attempt is made to store an event for which no table
         exists, the schema is automatically retrieved and a suitable
         table generated."""
-        eventlogging.store_sql_event(self.meta, self.event)
+        eventlogging.store_sql_event(self.meta, self.engine,
+                                     self.config, self.event)
         self.assertIn('TestSchema_123', self.meta.tables)
 
     def test_column_names(self):
@@ -51,7 +52,8 @@
 
     def test_encoding(self):
         """Timestamps and unicode strings are correctly encoded."""
-        eventlogging.jrm.store_sql_event(self.meta, self.event)
+        eventlogging.jrm.store_sql_event(self.meta, self.engine,
+                                         self.config, self.event)
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
         self.assertEqual(row['event_value'], '☆ 彡')
@@ -62,9 +64,10 @@
         )
 
     def test_reflection(self):
-        """Tables which exist in the database but not in the MetaData cache are
-        correctly reflected."""
-        eventlogging.store_sql_event(self.meta, self.event)
+        """Tables which exist in the database but not
+        in the MetaData cache are correctly reflected."""
+        eventlogging.store_sql_event(self.meta, self.engine,
+                                     self.config, self.event)
 
         # Tell Python to forget everything it knows about this database
         # by purging ``MetaData``. The actual data in the database is
@@ -79,5 +82,6 @@
         # The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
         # will ensure that we don't attempt to CREATE TABLE on the
         # already-existing table:
-        eventlogging.store_sql_event(self.meta, self.event, True)
+        eventlogging.store_sql_event(self.meta, self.engine,
+                                     self.config, self.event, True)
         self.assertIn('TestSchema_123', self.meta.tables)
diff --git a/server/tox.ini b/server/tox.ini
index 0c0f0f9..e1cdfd3 100644
--- a/server/tox.ini
+++ b/server/tox.ini
@@ -9,6 +9,8 @@
 #
 # ..        _tox: http://tox.readthedocs.org/en/latest/
 # .. _virtualenv: http://pypi.python.org/pypi/virtualenv
+# To run just one test for one environment: 
+# tox -e py27 -- -s tests.test_jrm:JrmTestCase.test_lazy_table_creation
 
 [tox]
 envlist = py27, py32, flake8
@@ -18,10 +20,10 @@
 setenv = VIRTUAL_ENV={envdir}
 deps = -r{toxinidir}/requirements.txt
        -r{toxinidir}/test-requirements.txt
-commands = nosetests \
+commands = nosetests -s \
   --verbose \
   --with-coverage \
-  --cover-package=eventlogging
+  --cover-package=eventlogging {posargs}
 
 [testenv:flake8]
 commands = flake8

-- 
To view, visit https://gerrit.wikimedia.org/r/169977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie642010fabc01d4f917fab229ad8aa44af8608c0
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Nuria <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to