Nuria has uploaded a new change for review.
https://gerrit.wikimedia.org/r/169977
Change subject: [WIP] Batching event insertion
......................................................................
[WIP] Batching event insertion
The eventlogging database consumer generates single-row inserts.
From a performance perspective (master overhead and slave replication lag)
it would be better to batch inserts. Batches do not
need to be large or of a particular size.
Passing existing tests but tests missing tests for new functionality
Also missing time-based flushing of events, WIP.
Bug: 67450
Change-Id: Ie642010fabc01d4f917fab229ad8aa44af8608c0
---
M server/README.rst
M server/eventlogging/__init__.py
A server/eventlogging/config.py
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/test-requirements.txt
M server/tests/fixtures.py
M server/tests/test_jrm.py
M server/tox.ini
9 files changed, 124 insertions(+), 23 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging
refs/changes/77/169977/1
diff --git a/server/README.rst b/server/README.rst
index d9836d2..df5e87c 100644
--- a/server/README.rst
+++ b/server/README.rst
@@ -8,7 +8,7 @@
To install dependencies in Ubuntu / Debian, simply run::
$ sudo apt-get install -y python-coverage python-mysqldb python-nose \
- python-pip python-sqlalchemy python-zmq python-pymongo
+ python-pip python-sqlalchemy python-zmq python-pymongo python-mock
.. _EventLogging: http://www.mediawiki.org/wiki/Extension:EventLogging
diff --git a/server/eventlogging/__init__.py b/server/eventlogging/__init__.py
index f80ae06..2c705fb 100644
--- a/server/eventlogging/__init__.py
+++ b/server/eventlogging/__init__.py
@@ -24,6 +24,7 @@
from .schema import *
from .streams import *
from .crypto import *
+from .config import *
# The fact that schema validation is entrusted to a third-party module
# is an implementation detail that a consumer of this package's API
diff --git a/server/eventlogging/config.py b/server/eventlogging/config.py
new file mode 100644
index 0000000..11862d5
--- /dev/null
+++ b/server/eventlogging/config.py
@@ -0,0 +1,47 @@
+""""
+Defines application wide configuration
+We want to avoid the usage of globals to allow for easy testing
+of config settings.
+
+The class just provides a wrapper arround globals
+to be able to override configuration easily.
+"""
+
+
+class GlobalConfig(object):
+
+ __singleton_instance = None
+
+ @classmethod
+ def instance(cls):
+ """"
+ Way to properly instantiate this class is
+ GlobalConfig.instance()
+ """
+ if not cls.__singleton_instance:
+ cls.__singleton_instance = cls()
+ return cls.__singleton_instance
+
+ def __init__(self):
+ """"
+ Global config settings are defined here
+ For now we have populated the configuration
+ with the settings we need in the jrm class
+ """
+ self._config = {}
+
+ """
+ Looking at throughput in vanadium the schema, that, at this time
+ sents most events per second is sending about 53 (~3000 per min).
+ Schemas that send little volume are sending about 0.02 events
+ per second or less (~1 per min).
+
+ With the settings below we will be flushing events every 2 minutes
+ or when the buffer hits 500 records.
+ """
+ self._config['MAX_BUFFER_AGE'] = 5 * 60
+
+ self._config['MAX_NUMBER_OF_RECORDS_IN_BUFFER'] = 500
+
+ def get(self, item, default=None):
+ return self._config.get(item, default)
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index cfe67bc..3f83a0d 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -85,9 +85,10 @@
def sql_writer(uri):
engine = sqlalchemy.create_engine(uri)
meta = sqlalchemy.MetaData(bind=engine)
-
+ from config import GlobalConfig
while 1:
- store_sql_event(meta, (yield))
+ config = GlobalConfig.instance()
+ store_sql_event(meta, engine, config, (yield))
@writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index c493685..3db1338 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -99,6 +99,12 @@
})
))
+# buffering holder, keyed by table_name
+buffer = {}
+
+# min heap keyed by (timestamp, table_name)
+h = []
+
def typecast(property):
"""Generates a SQL column definition from a JSON Schema property
@@ -164,21 +170,54 @@
return table
-def store_sql_event(meta, event, ignore_dupes=False):
- """Store an event in the database."""
+def store_sql_event(meta, engine, config, event, ignore_dupes=False):
+ """
+ Store an event in the database.
+
+ Events are buffered and flushed from the buffer according
+ to the following criteria:
+
+ When an event comes in if a buffer for that table exists is appended
+ to the buffer. Should the buffer be older
+ than MAX_BUFFER_AGE or have more than
+ MAX_NUMBER_OF_RECORDS_IN_BUFFER items
+ the buffer is flushed after the record is appended.
+ Should the table not exist it will be created at flushing time.
+
+ If the buffer for that event does not exists we start a buffer.
+
+ """
scid = (event['schema'], event['revision'])
- table = get_table(meta, scid)
+ # construct table name
+ table_name = TABLE_NAME_FORMAT % scid
+
event = flatten(event)
event = {k: v for k, v in items(event) if k not in NO_DB_PROPERTIES}
- insert = table.insert(values=event)
- try:
- insert.execute()
- except sqlalchemy.exc.IntegrityError as e:
- if not ignore_dupes or 'unique' not in str(e).lower():
- raise
- except sqlalchemy.exc.ProgrammingError:
- table.create()
- insert.execute()
+
+ # do we need to start buffering for this table?
+ if buffer.get(table_name) is None:
+ buffer[table_name] = []
+
+ # append to buffer
+ buffer[table_name].append(event)
+
+ # do we need to flush this schema?
+ number_of_events = len(buffer[table_name])
+ if number_of_events >= config.get('MAX_NUMBER_OF_RECORDS_IN_BUFFER'):
+ # this would create table if not exists
+ table = get_table(meta, scid)
+ try:
+ events = buffer[table_name]
+ engine.execute(table.insert(), events)
+ except sqlalchemy.exc.IntegrityError as e:
+ if not ignore_dupes or 'unique' not in str(e).lower():
+ raise
+ except sqlalchemy.exc.ProgrammingError:
+ table.create()
+ engine.execute(table.insert(), events)
+ finally:
+ # make sure to empty buffer in memory
+ buffer[table_name] = []
def _property_getter(item):
diff --git a/server/test-requirements.txt b/server/test-requirements.txt
index 33f4945..d8c14b6 100644
--- a/server/test-requirements.txt
+++ b/server/test-requirements.txt
@@ -1,2 +1,3 @@
coverage
nose
+mock
diff --git a/server/tests/fixtures.py b/server/tests/fixtures.py
index 01ef674..1885200 100644
--- a/server/tests/fixtures.py
+++ b/server/tests/fixtures.py
@@ -14,6 +14,7 @@
import eventlogging
import eventlogging.factory
import sqlalchemy
+from mock import MagicMock
TEST_SCHEMA_SCID = ('TestSchema', 123)
@@ -191,6 +192,11 @@
super(DatabaseTestMixin, self).setUp()
self.engine = sqlalchemy.create_engine('sqlite://', echo=True)
self.meta = sqlalchemy.MetaData(bind=self.engine)
+ # mocking to set buffer size to '1'
+ from eventlogging.config import GlobalConfig
+ config = GlobalConfig.instance()
+ config.get = MagicMock(return_value=1)
+ self.config = config
def tearDown(self):
"""Dispose of the database access objects."""
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index 166a621..2d32bde 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -24,7 +24,8 @@
"""If an attempt is made to store an event for which no table
exists, the schema is automatically retrieved and a suitable
table generated."""
- eventlogging.store_sql_event(self.meta, self.event)
+ eventlogging.store_sql_event(self.meta, self.engine,
+ self.config, self.event)
self.assertIn('TestSchema_123', self.meta.tables)
def test_column_names(self):
@@ -51,7 +52,8 @@
def test_encoding(self):
"""Timestamps and unicode strings are correctly encoded."""
- eventlogging.jrm.store_sql_event(self.meta, self.event)
+ eventlogging.jrm.store_sql_event(self.meta, self.engine,
+ self.config, self.event)
table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
row = table.select().execute().fetchone()
self.assertEqual(row['event_value'], '☆ 彡')
@@ -62,9 +64,10 @@
)
def test_reflection(self):
- """Tables which exist in the database but not in the MetaData cache are
- correctly reflected."""
- eventlogging.store_sql_event(self.meta, self.event)
+ """Tables which exist in the database but not
+ in the MetaData cache are correctly reflected."""
+ eventlogging.store_sql_event(self.meta, self.engine,
+ self.config, self.event)
# Tell Python to forget everything it knows about this database
# by purging ``MetaData``. The actual data in the database is
@@ -79,5 +82,6 @@
# The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
# will ensure that we don't attempt to CREATE TABLE on the
# already-existing table:
- eventlogging.store_sql_event(self.meta, self.event, True)
+ eventlogging.store_sql_event(self.meta, self.engine,
+ self.config, self.event, True)
self.assertIn('TestSchema_123', self.meta.tables)
diff --git a/server/tox.ini b/server/tox.ini
index 0c0f0f9..e1cdfd3 100644
--- a/server/tox.ini
+++ b/server/tox.ini
@@ -9,6 +9,8 @@
#
# .. _tox: http://tox.readthedocs.org/en/latest/
# .. _virtualenv: http://pypi.python.org/pypi/virtualenv
+# To run just one test for one environment:
+# tox -e py27 -- -s tests.test_jrm:JrmTestCase.test_lazy_table_creation
[tox]
envlist = py27, py32, flake8
@@ -18,10 +20,10 @@
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
-commands = nosetests \
+commands = nosetests -s \
--verbose \
--with-coverage \
- --cover-package=eventlogging
+ --cover-package=eventlogging {posargs}
[testenv:flake8]
commands = flake8
--
To view, visit https://gerrit.wikimedia.org/r/169977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie642010fabc01d4f917fab229ad8aa44af8608c0
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Nuria <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits