Nuria has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/256452

Change subject: Add support for batch production in eventlogging-service
......................................................................

Add support for batch production in eventlogging-service

Reponses:
- 201 if all events are accepted.
- 207 if some but not all events are accepted.
- 400 if no events are accepted.

In case of any errored events, those events will be in the response
body as a list of EventError objects.

Change-Id: Ifdde80e68832d5b3c34ba4ebefbb9e56ff601feb
---
M bin/eventlogging-consumer
M bin/eventlogging-devserver
M bin/eventlogging-forwarder
M bin/eventlogging-multiplexer
M bin/eventlogging-processor
M bin/eventlogging-reporter
M bin/eventlogging-service
M eventlogging/schema.py
M eventlogging/service.py
M tests/test_service.py
10 files changed, 221 insertions(+), 62 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/eventlogging 
refs/changes/52/256452/1

diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer
index c0b8f50..cb9a9a6 100755
--- a/bin/eventlogging-consumer
+++ b/bin/eventlogging-consumer
@@ -23,7 +23,7 @@
 from __future__ import unicode_literals
 
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
diff --git a/bin/eventlogging-devserver b/bin/eventlogging-devserver
index 48d57e5..ba7e7b6 100755
--- a/bin/eventlogging-devserver
+++ b/bin/eventlogging-devserver
@@ -33,7 +33,7 @@
 from __future__ import print_function, unicode_literals
 
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
diff --git a/bin/eventlogging-forwarder b/bin/eventlogging-forwarder
index cb9cdfe..86206ad 100755
--- a/bin/eventlogging-forwarder
+++ b/bin/eventlogging-forwarder
@@ -26,7 +26,7 @@
 from __future__ import unicode_literals
 
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
diff --git a/bin/eventlogging-multiplexer b/bin/eventlogging-multiplexer
index f0d225d..a736712 100755
--- a/bin/eventlogging-multiplexer
+++ b/bin/eventlogging-multiplexer
@@ -24,7 +24,7 @@
 from __future__ import unicode_literals
 
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
diff --git a/bin/eventlogging-processor b/bin/eventlogging-processor
index be6f901..d6d6b8c 100755
--- a/bin/eventlogging-processor
+++ b/bin/eventlogging-processor
@@ -32,7 +32,7 @@
 from __future__ import unicode_literals
 
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
diff --git a/bin/eventlogging-reporter b/bin/eventlogging-reporter
index 644a50d..ccff01c 100755
--- a/bin/eventlogging-reporter
+++ b/bin/eventlogging-reporter
@@ -13,7 +13,7 @@
 
 """
 import sys
-reload(sys)
+reload(sys)  # noqa
 sys.setdefaultencoding('utf-8')
 
 import argparse
@@ -167,4 +167,4 @@
 
 
 if __name__ == '__main__':
-    print monitor_pubs(iter_pubs('/etc/eventlogging.d/processors'))
+    print(monitor_pubs(iter_pubs('/etc/eventlogging.d/processors')))
diff --git a/bin/eventlogging-service b/bin/eventlogging-service
index bd688c0..b12a0da 100755
--- a/bin/eventlogging-service
+++ b/bin/eventlogging-service
@@ -53,6 +53,13 @@
     'processes as there are cores.  Default 1.',
 )
 
+ap.add_argument(
+    '--error-output',
+    default=None,
+    help='URI of output stream for errored events. '
+    'Errored events are written using the EventError schema.'
+)
+
 ap.add_argument('output', nargs='+', help='URIs of output streams.')
 
 
@@ -82,7 +89,8 @@
     )
 
     service = EventLoggingService(
-        args.output
+        args.output,
+        args.error_output
     )
 
     # Start listening for requests.
diff --git a/eventlogging/schema.py b/eventlogging/schema.py
index 8636c9a..62cc04f 100644
--- a/eventlogging/schema.py
+++ b/eventlogging/schema.py
@@ -73,6 +73,7 @@
 # SCID of the metadata object which wraps each capsule-style event.
 CAPSULE_SCID = ('EventCapsule', 10981547)
 
+# TODO: Make new meta style EventError on meta.
 ERROR_SCID = ('EventError', 14035058)
 
 # Schemas retrieved via HTTP or files are cached in this dictionary.
diff --git a/eventlogging/service.py b/eventlogging/service.py
index d54dfd4..d2a4d8e 100644
--- a/eventlogging/service.py
+++ b/eventlogging/service.py
@@ -21,7 +21,10 @@
 from . import ValidationError, SchemaError  # these are int __init__.py
 from .compat import json
 from .factory import apply_safe, get_writer
-from .schema import validate, get_schema, schema_name_from_event
+from .schema import (
+    validate, get_schema, schema_name_from_event, id_from_event,
+    create_event_error
+)
 from .topic import (
     get_topic_config, scid_for_topic, schema_for_topic, topic_from_event,
     TopicNotConfigured, TopicNotFound,
@@ -45,10 +48,13 @@
     until your event is ACKed by Kafka.
     """
 
-    def __init__(self, writer_uris):
+    def __init__(self, writer_uris, error_writer_uri=None):
         """
         :param writer_uris: A list of EventLogging writer_uris.  Each valid
         event will be written to each of these writers.
+
+        :param error_writer_uri: If configured, EventErrors will be written
+        to this writer.
         """
 
         routes = [
@@ -75,6 +81,13 @@
             self.writers[uri] = get_writer(uri)
             logging.info('Publishing valid JSON events to %s.', uri)
 
+        # Errored events will be written to this writer.
+        if error_writer_uri:
+            self.error_writer = get_writer(error_writer_uri)
+            logging.info('Publishing errored events to %s.', error_writer_uri)
+        else:
+            self.error_writer = None
+
     def send(self, event):
         """Send the event to configured eventlogging writers."""
         for uri in self.writers.keys():
@@ -93,24 +106,30 @@
                 self.writers[uri] = w
                 w.send(event)
 
-    def process_json(
-        self,
-        event_string,
-        callback=None
-    ):
+    def decode_json(self, s, callback=None):
         """
-        Parse the event_string as json and validate it
-        using the schema configured for this event's topic.
-        If valid, the event will be sent to all configured writers.
+        Loads the string into an object.  If the top level
+        object is a dict, it will be wrapped in a list.
+        The result of this function should be a list of events.
 
-        If callback is set, then it is called with the
-        parsed and validated event object, otherwise
-        the event object is just returned.
+        :param s: json string
         """
+        events = json.loads(s.decode('utf-8'))
+        if isinstance(events, dict):
+            events = [events]
+        if callback:
+            callback(events)
+        else:
+            return events
 
-        # This will add schema and revision to the event
-        # based on topic config.
-        event = json.loads(event_string)
+    def process_event(self, event):
+        """
+        Validate the event using the schema configured for it's topic.
+        A valid event will be sent to the configured writers.
+
+        Returns True on success, otherwise some Exception will be thrown.
+
+        """
         topic = topic_from_event(event)
         schema_name, revision = scid_for_topic(topic)
 
@@ -129,15 +148,73 @@
             encapsulate = True
 
         validate(event, encapsulate=encapsulate)
-
         # Send this processed event to all configured writers
-        # This will block until each writer finishes writing this event.
+        # This will block until each writer finishes writing
+        # this event.
         self.send(event)
+        return True
+
+    def handle_events(self, events, callback=None):
+        """
+        Calls process_event on each of the events.  Any
+        errors thrown by process_event will be caught, and EventError
+        objects will be returned describing the error that the offending
+        event caused.
+
+        :param events: list of event dicts
+        """
+        # This will add schema and revision to the event
+        # based on topic config.
+
+        event_errors = []
+        for event in events:
+            error_message = None
+
+            try:
+                self.process_event(event)
+
+            except TopicNotConfigured as e:
+                error_message = str(e)
+
+            except TopicNotFound as e:
+                error_message = 'Could not get topic from event %s. %s' % (
+                    id_from_event(event), e
+                )
+
+            except SchemaError as e:
+                error_message = 'Could not find schema for provided topic ' \
+                    'in event %s. %s' % (id_from_event, e)
+
+            except ValidationError as e:
+                error_message = 'Failed validating event %s of schema ' \
+                    '%s. %s' % (
+                        id_from_event(event),
+                        schema_name_from_event(event),
+                        e.message
+                    )
+
+            finally:
+                # If we encountered an error while processing this event,
+                # log it and create an EventError that will be returned.
+                if error_message:
+                    logging.error("Failed processing event: %s", error_message)
+                    event_error = create_event_error(
+                        json.dumps(event),
+                        error_message,
+                        # Should we make different error codes for these?
+                        'validation',
+                        event
+                    )
+                    event_errors.append(event_error)
+                    # If error_writer is configured, send this
+                    # EventError to it.
+                    if self.error_writer:
+                        self.error_writer.send(event_error)
 
         if callback:
-            callback(event)
+            callback(event_errors)
         else:
-            return event
+            return event_errors
 
     def start(self, port=8085, num_processes=1):
         """
@@ -151,49 +228,76 @@
 
 
 class EventHandler(tornado.web.RequestHandler):
+
     @tornado.gen.coroutine
     def post(self):
         """
-        event_string json string is read in from POST body
-        and then asynchronously parsed and validated, and then
+        events_string json string is read in from POST body.
+        It can be a single event object or a list of event objects.
+        They will be asynchronously parsed and validated, and then
         written to configured EventLogging writers.  'topic'
-        must be set in the event meta data.
+        must be set in each event's meta data.
+
+        Reponses:
+        - 201 if all events are accepted.
+        - 207 if some but not all events are accepted.
+        - 400 if no events are accepted.
+
+        In case of any errored events, those events will be in the response
+        body as a JSON list of the form:
+        [{'event': {...}, 'error': 'String Error Message'}, ... ]
+
+        # TODO: Use EventError and configure an error writer like
+          eventlogging-processor?
         """
+        response_body = None
         if self.request.headers['Content-Type'] == 'application/json':
             try:
-                body = self.request.body.decode('utf-8')
-                if body:
-                    event = yield tornado.gen.Task(
-                        self.application.process_json,
-                        body
+                if self.request.body:
+                    # decode the json string into a list
+                    events = yield tornado.gen.Task(
+                        self.application.decode_json,
+                        self.request.body
                     )
-                    response_code = 201
-                    response_text = '%s event on topic %s accepted. ' % (
-                        schema_name_from_event(event), topic_from_event(event)
+                    # process and validate events
+                    event_errors = yield tornado.gen.Task(
+                        self.application.handle_events,
+                        events
                     )
+                    events_count = len(events)
+                    event_errors_count = len(event_errors)
+
+                    # If all events were accepted, then return 201
+                    if event_errors_count == 0:
+                        response_code = 201
+                        response_text = 'All %s events were accepted.' % (
+                            events_count
+                        )
+                    else:
+                        # Else if all events failed validation
+                        # return 400 and list of EventErrors.
+                        if events_count == event_errors_count:
+                            response_code = 400
+                            response_text = ('0 out of %s events were '
+                                             'accepted.') % events_count
+                        # Else at least 1 event failed validation.
+                        # Return 207 and the list of list of EventErrors.
+                        else:
+                            response_code = 207
+                            response_text = ('%s out of %s events '
+                                             'were accepted.') % (
+                                events_count - event_errors_count,
+                                events_count
+                            )
+                        response_body = json.dumps(event_errors)
                 else:
                     response_code = 400
                     response_text = 'Must provide body in request.'
+
             except UnicodeError as e:
                 response_code = 400
                 response_text = 'UnicodeError while utf-8 decoding '
                 'POST body: %s' % e
-            except TopicNotConfigured as e:
-                response_code = 404
-                response_text = str(e)
-            except TopicNotFound as e:
-                response_code = 400
-                response_text = 'Must provide topic. %s' % e
-            except IncorrectSerialization as e:
-                response_code = 400
-                response_text = e.message
-            except ValidationError as e:
-                response_code = 400
-                response_text = 'Unable to validate event. ' + e.message
-            except SchemaError as e:
-                response_code = 500
-                response_text = 'Could not find schema for provided '
-                'topic. %s' % e
 
         else:
             response_code = 400
@@ -205,6 +309,8 @@
             logging.error(response_text)
 
         self.set_status(response_code, response_text)
+        if response_body:
+            self.write(response_body)
 
 
 class TopicHandler(tornado.web.RequestHandler):
@@ -223,6 +329,7 @@
 
 
 class SchemaHandler(tornado.web.RequestHandler):
+
     @tornado.gen.coroutine
     def get(self, schema_name, revision):
         """
@@ -251,10 +358,7 @@
 
 
 class TopicConfigHandler(tornado.web.RequestHandler):
+
     def get(self):
         self.set_status(200)
         self.write(get_topic_config())
-
-
-class IncorrectSerialization(Exception):
-    pass
diff --git a/tests/test_service.py b/tests/test_service.py
index f16df0d..9770006 100644
--- a/tests/test_service.py
+++ b/tests/test_service.py
@@ -46,7 +46,7 @@
                                self.stop, method="POST",
                                body=json.dumps(event), headers=headers)
         response = self.wait()
-        self.assertEqual(404, response.code)
+        self.assertEqual(400, response.code)
 
     def test_post_valid_event_configured_topic(self):
         """
@@ -87,8 +87,6 @@
                                body=body, headers=headers)
         response = self.wait()
         self.assertEqual(400, response.code)
-        self.assertTrue("'required_field' is a required property"
-                        in response.reason)
 
     def test_post_event_missing_optional_field(self):
 
@@ -102,6 +100,54 @@
         response = self.wait()
         self.assertEqual(201, response.code)
 
+    def test_post_event_batch(self):
+
+        headers = {'Content-type': 'application/json'}
+        valid_eventA = copy.deepcopy(self.event_with_meta)
+        valid_eventB = copy.deepcopy(self.event_with_meta)
+        events = [valid_eventA, valid_eventB]
+        body = json.dumps(events)
+        self.http_client.fetch(self.get_url('/v1/events'),
+                               self.stop, method="POST",
+                               body=body, headers=headers)
+        response = self.wait()
+        self.assertEqual(201, response.code)
+
+    def test_post_event_batch_one_invalid(self):
+
+        headers = {'Content-type': 'application/json'}
+        valid_eventA = copy.deepcopy(self.event_with_meta)
+        valid_eventB = copy.deepcopy(self.event_with_meta)
+        # this is supposed to be a string.
+        valid_eventB['required_field'] = 123
+        events = [valid_eventA, valid_eventB]
+        body = json.dumps(events)
+        self.http_client.fetch(self.get_url('/v1/events'),
+                               self.stop, method="POST",
+                               body=body, headers=headers)
+        response = self.wait()
+        self.assertEqual(207, response.code)
+        event_errors = json.loads(response.body.decode('utf-8'))
+        self.assertEqual('validation', event_errors[0]['event']['code'])
+
+    def test_post_event_batch_all_invalid(self):
+
+        headers = {'Content-type': 'application/json'}
+        valid_eventA = copy.deepcopy(self.event_with_meta)
+        valid_eventA['required_field'] = 123
+        valid_eventB = copy.deepcopy(self.event_with_meta)
+        valid_eventB['required_field'] = 456
+        events = [valid_eventA, valid_eventB]
+        body = json.dumps(events)
+        self.http_client.fetch(self.get_url('/v1/events'),
+                               self.stop, method="POST",
+                               body=body, headers=headers)
+        response = self.wait()
+        self.assertEqual(400, response.code)
+        event_errors = json.loads(response.body.decode('utf-8'))
+        self.assertEqual('validation', event_errors[0]['event']['code'])
+        self.assertEqual('validation', event_errors[1]['event']['code'])
+
     # Topic Testing
     def test_list_topics(self):
         """

-- 
To view, visit https://gerrit.wikimedia.org/r/256452
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifdde80e68832d5b3c34ba4ebefbb9e56ff601feb
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <o...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to