Ottomata has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/358062 )
Change subject: Make eventlogging-consumer support a local schema repo, and
teach jrm.py how to serialize arrays
......................................................................
Make eventlogging-consumer support a local schema repo, and teach jrm.py how to
serialize arrays
This will allow us to use eventlogging-consumer to consume from eventbus topics
and insert into MySQL
Bug: T150369
Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
---
M bin/eventlogging-consumer
M eventlogging/jrm.py
M tests/fixtures.py
M tests/test_jrm.py
4 files changed, 59 insertions(+), 17 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
Nuria: Verified; Looks good to me, approved
diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer
index 7734038..9df374a 100755
--- a/bin/eventlogging-consumer
+++ b/bin/eventlogging-consumer
@@ -31,7 +31,6 @@
import eventlogging
-
eventlogging.setup_logging()
ap = argparse.ArgumentParser(description='EventLogger',
@@ -40,11 +39,19 @@
ap.add_argument('output', help='URI of output stream', default='stdout://')
ap.add_argument('--no-plugins', help='run without loading plug-ins',
action='store_false', dest='load_plugins')
+
+ap.add_argument(
+ '--schemas-path',
+ help='Path to local schema repository',
+)
ap.set_defaults(load_plugins=True)
args = ap.parse_args()
if args.load_plugins:
eventlogging.load_plugins()
+if args.schemas_path:
+ eventlogging.schema.load_local_schemas(args.schemas_path)
+
logging.info('Driving %s -> %s..', args.input, args.output)
eventlogging.drive(args.input, args.output)
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 15a7e0b..fa93e51 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -15,10 +15,11 @@
import logging
import _mysql
import os
+import re
import sqlalchemy
import time
-from .compat import items
+from .compat import items, json
from .schema import get_schema
from .utils import flatten
@@ -30,9 +31,15 @@
# timestamps. See `<https://www.mediawiki.org/wiki/Manual:Timestamp>`_.
MEDIAWIKI_TIMESTAMP = '%Y%m%d%H%M%S'
-# Format string for table names. Interpolates a `SCID` -- i.e., a tuple
-# of (schema_name, revision_id).
-TABLE_NAME_FORMAT = '%s_%s'
+
+
+def scid_to_table_name(scid):
+ """Convert an scid to a SQL table name."""
+ return '{}_{}'.format(
+ re.sub('[^A-Za-z0-9]+', '_', scid[0]),
+ scid[1]
+ )
+
# An iterable of properties that should not be stored in the database.
NO_DB_PROPERTIES = (
@@ -48,6 +55,14 @@
)
}
}
+
+# Maximum length for string and string-like types. Because InnoDB limits index
+# columns to 767 bytes, the maximum length for a utf8mb4 column (which
+# reserves up to four bytes per character) is 191 (191 * 4 = 764).
+STRING_MAX_LEN = 1024
+
+# Default table column definition, to be overridden by mappers below.
+COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
class MediaWikiTimestamp(sqlalchemy.TypeDecorator):
@@ -73,13 +88,22 @@
return datetime.datetime.strptime(value, MEDIAWIKI_TIMESTAMP)
-# Maximum length for string and string-like types. Because InnoDB limits index
-# columns to 767 bytes, the maximum length for a utf8mb4 column (which
-# reserves up to four bytes per character) is 191 (191 * 4 = 764).
-STRING_MAX_LEN = 1024
+class JsonSerde(sqlalchemy.TypeDecorator):
+ """A :class:`sqlalchemy.TypeDecorator` for converting to and from JSON
strings."""
-# Default table column definition, to be overridden by mappers below.
-COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
+ impl = sqlalchemy.Unicode(STRING_MAX_LEN)
+
+ def process_bind_param(self, value, dialect=None):
+ """Convert the value to a JSON string"""
+ value = json.dumps(value)
+ if hasattr(value, 'decode'):
+ value = value.decode('utf-8')
+ return value
+
+ def process_result_value(self, value, dialect=None):
+ """Convert a JSON string into a Python object"""
+ return json.loads(value)
+
# Mapping of JSON Schema attributes to valid values. Each value maps to
# a dictionary of options. The options are compounded into a single
@@ -97,6 +121,8 @@
'integer': {'type_': sqlalchemy.BigInteger},
'number': {'type_': sqlalchemy.Float},
'string': {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)},
+ # Encode arrays as JSON strings.
+ 'array': {'type_': JsonSerde},
}),
('format', {
'utc-millisec': {'type_': MediaWikiTimestamp, 'index': True},
@@ -152,8 +178,9 @@
# | | +-------------+------------+
# +-----------------+-------->| Return table description |
# +--------------------------+
+ table_name = scid_to_table_name(scid)
try:
- return meta.tables[TABLE_NAME_FORMAT % scid]
+ return meta.tables[table_name]
except KeyError:
return declare_table(meta, scid, should_encapsulate)
@@ -166,7 +193,7 @@
columns = schema_mapper(schema)
table_options = ENGINE_TABLE_OPTIONS.get(meta.bind.name, {})
- table_name = TABLE_NAME_FORMAT % scid
+ table_name = scid_to_table_name(scid)
table = sqlalchemy.Table(table_name, meta, *columns, **table_options)
table.create(checkfirst=True)
diff --git a/tests/fixtures.py b/tests/fixtures.py
index fe9bea1..a117ff0 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -149,6 +149,12 @@
'type': 'string',
'required': True
},
+ 'list': {
+ 'type': 'array',
+ 'items': {
+ 'type': 'string'
+ }
+ },
'nested': {
'type': 'object',
'properties': {
@@ -202,6 +208,7 @@
_event = {
'event': {
'value': '☆ 彡',
+ 'list': ['a','☆ 彡'],
'nested': {
'deeplyNested': {
'pi': 3.14159
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 2b69ee9..98f1d20 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -19,7 +19,7 @@
from .fixtures import (
DatabaseTestMixin, TEST_SCHEMA_SCID, TEST_META_SCHEMA_SCID
)
-from eventlogging.jrm import TABLE_NAME_FORMAT
+from eventlogging.jrm import scid_to_table_name
class JrmTestCase(DatabaseTestMixin, unittest.TestCase):
@@ -31,7 +31,7 @@
table generated."""
eventlogging.store_sql_events(
self.meta, TEST_SCHEMA_SCID, [self.event])
- table_name = TABLE_NAME_FORMAT % TEST_SCHEMA_SCID
+ table_name = scid_to_table_name(TEST_SCHEMA_SCID)
self.assertIn(table_name, self.meta.tables)
table = self.meta.tables[table_name]
# is the table on the db and does it have the right data?
@@ -47,7 +47,7 @@
suitable table generated."""
eventlogging.store_sql_events(
self.meta, TEST_META_SCHEMA_SCID, [self.event_with_meta])
- table_name = TABLE_NAME_FORMAT % TEST_META_SCHEMA_SCID
+ table_name = scid_to_table_name(TEST_META_SCHEMA_SCID)
self.assertIn(table_name, self.meta.tables)
table = self.meta.tables[table_name]
# is the table on the db and does it have the right data?
@@ -79,12 +79,13 @@
self.assertEqual(flat['event_nested_deeplyNested_pi'], 3.14159)
def test_encoding(self):
- """Timestamps and unicode strings are correctly encoded."""
+ """Timestamps, unicode strings, and arrays are correctly encoded."""
eventlogging.jrm.store_sql_events(
self.meta, TEST_SCHEMA_SCID, [self.event])
table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
row = table.select().execute().fetchone()
self.assertEqual(row['event_value'], '☆ 彡')
+ self.assertEqual(row['event_list'], ['a', '☆ 彡'])
self.assertEqual(row['uuid'], 'babb66f34a0a5de3be0c6513088be33e')
self.assertEqual(
row['timestamp'],
--
To view, visit https://gerrit.wikimedia.org/r/358062
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
Gerrit-PatchSet: 4
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits