Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/358062 )

Change subject: Make eventlogging-consumer support a local schema repo, and 
teach jrm.py how to serialize arrays
......................................................................


Make eventlogging-consumer support a local schema repo, and teach jrm.py how to 
serialize arrays

This will allow us to use eventlogging-consumer to consume from eventbus topics 
and insert into MySQL

Bug: T150369
Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
---
M bin/eventlogging-consumer
M eventlogging/jrm.py
M tests/fixtures.py
M tests/test_jrm.py
4 files changed, 59 insertions(+), 17 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved
  Nuria: Verified; Looks good to me, approved



diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer
index 7734038..9df374a 100755
--- a/bin/eventlogging-consumer
+++ b/bin/eventlogging-consumer
@@ -31,7 +31,6 @@
 
 import eventlogging
 
-
 eventlogging.setup_logging()
 
 ap = argparse.ArgumentParser(description='EventLogger',
@@ -40,11 +39,19 @@
 ap.add_argument('output', help='URI of output stream', default='stdout://')
 ap.add_argument('--no-plugins', help='run without loading plug-ins',
                 action='store_false', dest='load_plugins')
+
+ap.add_argument(
+    '--schemas-path',
+    help='Path to local schema repository',
+)
 ap.set_defaults(load_plugins=True)
 args = ap.parse_args()
 
 if args.load_plugins:
     eventlogging.load_plugins()
 
+if args.schemas_path:
+    eventlogging.schema.load_local_schemas(args.schemas_path)
+
 logging.info('Driving %s -> %s..', args.input, args.output)
 eventlogging.drive(args.input, args.output)
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 15a7e0b..fa93e51 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -15,10 +15,11 @@
 import logging
 import _mysql
 import os
+import re
 import sqlalchemy
 import time
 
-from .compat import items
+from .compat import items, json
 from .schema import get_schema
 from .utils import flatten
 
@@ -30,9 +31,15 @@
 # timestamps. See `<https://www.mediawiki.org/wiki/Manual:Timestamp>`_.
 MEDIAWIKI_TIMESTAMP = '%Y%m%d%H%M%S'
 
-# Format string for table names. Interpolates a `SCID` -- i.e., a tuple
-# of (schema_name, revision_id).
-TABLE_NAME_FORMAT = '%s_%s'
+
+
+def scid_to_table_name(scid):
+    """Convert an scid to a SQL table name."""
+    return '{}_{}'.format(
+        re.sub('[^A-Za-z0-9]+', '_', scid[0]),
+        scid[1]
+    )
+
 
 # An iterable of properties that should not be stored in the database.
 NO_DB_PROPERTIES = (
@@ -48,6 +55,14 @@
         )
     }
 }
+
+# Maximum length for string and string-like types. Because InnoDB limits index
+# columns to 767 bytes, the maximum length for a utf8mb4 column (which
+# reserves up to four bytes per character) is 191 (191 * 4 = 764).
+STRING_MAX_LEN = 1024
+
+# Default table column definition, to be overridden by mappers below.
+COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
 
 
 class MediaWikiTimestamp(sqlalchemy.TypeDecorator):
@@ -73,13 +88,22 @@
         return datetime.datetime.strptime(value, MEDIAWIKI_TIMESTAMP)
 
 
-# Maximum length for string and string-like types. Because InnoDB limits index
-# columns to 767 bytes, the maximum length for a utf8mb4 column (which
-# reserves up to four bytes per character) is 191 (191 * 4 = 764).
-STRING_MAX_LEN = 1024
+class JsonSerde(sqlalchemy.TypeDecorator):
+    """A :class:`sqlalchemy.TypeDecorator` for converting to and from JSON 
strings."""
 
-# Default table column definition, to be overridden by mappers below.
-COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
+    impl = sqlalchemy.Unicode(STRING_MAX_LEN)
+
+    def process_bind_param(self, value, dialect=None):
+        """Convert the value to a JSON string"""
+        value = json.dumps(value)
+        if hasattr(value, 'decode'):
+            value = value.decode('utf-8')
+        return value
+
+    def process_result_value(self, value, dialect=None):
+        """Convert a JSON string into a Python object"""
+        return json.loads(value)
+
 
 # Mapping of JSON Schema attributes to valid values. Each value maps to
 # a dictionary of options. The options are compounded into a single
@@ -97,6 +121,8 @@
         'integer': {'type_': sqlalchemy.BigInteger},
         'number': {'type_': sqlalchemy.Float},
         'string': {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)},
+        # Encode arrays as JSON strings.
+        'array': {'type_': JsonSerde},
     }),
     ('format', {
         'utc-millisec': {'type_': MediaWikiTimestamp, 'index': True},
@@ -152,8 +178,9 @@
     #       |                 |         +-------------+------------+
     #       +-----------------+-------->| Return table description |
     #                                   +--------------------------+
+    table_name = scid_to_table_name(scid)
     try:
-        return meta.tables[TABLE_NAME_FORMAT % scid]
+        return meta.tables[table_name]
     except KeyError:
         return declare_table(meta, scid, should_encapsulate)
 
@@ -166,7 +193,7 @@
     columns = schema_mapper(schema)
 
     table_options = ENGINE_TABLE_OPTIONS.get(meta.bind.name, {})
-    table_name = TABLE_NAME_FORMAT % scid
+    table_name = scid_to_table_name(scid)
 
     table = sqlalchemy.Table(table_name, meta, *columns, **table_options)
     table.create(checkfirst=True)
diff --git a/tests/fixtures.py b/tests/fixtures.py
index fe9bea1..a117ff0 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -149,6 +149,12 @@
                     'type': 'string',
                     'required': True
                 },
+                'list': {
+                    'type': 'array',
+                    'items': {
+                        'type': 'string'
+                    }
+                },
                 'nested': {
                     'type': 'object',
                     'properties': {
@@ -202,6 +208,7 @@
 _event = {
     'event': {
         'value': '☆ 彡',
+        'list': ['a','☆ 彡'],
         'nested': {
             'deeplyNested': {
                 'pi': 3.14159
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 2b69ee9..98f1d20 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -19,7 +19,7 @@
 from .fixtures import (
     DatabaseTestMixin, TEST_SCHEMA_SCID, TEST_META_SCHEMA_SCID
 )
-from eventlogging.jrm import TABLE_NAME_FORMAT
+from eventlogging.jrm import scid_to_table_name
 
 
 class JrmTestCase(DatabaseTestMixin, unittest.TestCase):
@@ -31,7 +31,7 @@
         table generated."""
         eventlogging.store_sql_events(
             self.meta, TEST_SCHEMA_SCID, [self.event])
-        table_name = TABLE_NAME_FORMAT % TEST_SCHEMA_SCID
+        table_name = scid_to_table_name(TEST_SCHEMA_SCID)
         self.assertIn(table_name, self.meta.tables)
         table = self.meta.tables[table_name]
         # is the table on the db  and does it have the right data?
@@ -47,7 +47,7 @@
         suitable table generated."""
         eventlogging.store_sql_events(
             self.meta, TEST_META_SCHEMA_SCID, [self.event_with_meta])
-        table_name = TABLE_NAME_FORMAT % TEST_META_SCHEMA_SCID
+        table_name = scid_to_table_name(TEST_META_SCHEMA_SCID)
         self.assertIn(table_name, self.meta.tables)
         table = self.meta.tables[table_name]
         # is the table on the db  and does it have the right data?
@@ -79,12 +79,13 @@
         self.assertEqual(flat['event_nested_deeplyNested_pi'], 3.14159)
 
     def test_encoding(self):
-        """Timestamps and unicode strings are correctly encoded."""
+        """Timestamps, unicode strings, and arrays are correctly encoded."""
         eventlogging.jrm.store_sql_events(
             self.meta, TEST_SCHEMA_SCID, [self.event])
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
         self.assertEqual(row['event_value'], '☆ 彡')
+        self.assertEqual(row['event_list'], ['a', '☆ 彡'])
         self.assertEqual(row['uuid'], 'babb66f34a0a5de3be0c6513088be33e')
         self.assertEqual(
             row['timestamp'],

-- 
To view, visit https://gerrit.wikimedia.org/r/358062
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
Gerrit-PatchSet: 4
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to