Nuria has uploaded a new change for review. https://gerrit.wikimedia.org/r/256452
Change subject: Add support for batch production in eventlogging-service ...................................................................... Add support for batch production in eventlogging-service Reponses: - 201 if all events are accepted. - 207 if some but not all events are accepted. - 400 if no events are accepted. In case of any errored events, those events will be in the response body as a list of EventError objects. Change-Id: Ifdde80e68832d5b3c34ba4ebefbb9e56ff601feb --- M bin/eventlogging-consumer M bin/eventlogging-devserver M bin/eventlogging-forwarder M bin/eventlogging-multiplexer M bin/eventlogging-processor M bin/eventlogging-reporter M bin/eventlogging-service M eventlogging/schema.py M eventlogging/service.py M tests/test_service.py 10 files changed, 221 insertions(+), 62 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/eventlogging refs/changes/52/256452/1 diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer index c0b8f50..cb9a9a6 100755 --- a/bin/eventlogging-consumer +++ b/bin/eventlogging-consumer @@ -23,7 +23,7 @@ from __future__ import unicode_literals import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse diff --git a/bin/eventlogging-devserver b/bin/eventlogging-devserver index 48d57e5..ba7e7b6 100755 --- a/bin/eventlogging-devserver +++ b/bin/eventlogging-devserver @@ -33,7 +33,7 @@ from __future__ import print_function, unicode_literals import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse diff --git a/bin/eventlogging-forwarder b/bin/eventlogging-forwarder index cb9cdfe..86206ad 100755 --- a/bin/eventlogging-forwarder +++ b/bin/eventlogging-forwarder @@ -26,7 +26,7 @@ from __future__ import unicode_literals import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse diff --git a/bin/eventlogging-multiplexer b/bin/eventlogging-multiplexer index f0d225d..a736712 100755 --- a/bin/eventlogging-multiplexer +++ b/bin/eventlogging-multiplexer @@ -24,7 +24,7 @@ from __future__ import unicode_literals import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse diff --git a/bin/eventlogging-processor b/bin/eventlogging-processor index be6f901..d6d6b8c 100755 --- a/bin/eventlogging-processor +++ b/bin/eventlogging-processor @@ -32,7 +32,7 @@ from __future__ import unicode_literals import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse diff --git a/bin/eventlogging-reporter b/bin/eventlogging-reporter index 644a50d..ccff01c 100755 --- a/bin/eventlogging-reporter +++ b/bin/eventlogging-reporter @@ -13,7 +13,7 @@ """ import sys -reload(sys) +reload(sys) # noqa sys.setdefaultencoding('utf-8') import argparse @@ -167,4 +167,4 @@ if __name__ == '__main__': - print monitor_pubs(iter_pubs('/etc/eventlogging.d/processors')) + print(monitor_pubs(iter_pubs('/etc/eventlogging.d/processors'))) diff --git a/bin/eventlogging-service b/bin/eventlogging-service index bd688c0..b12a0da 100755 --- a/bin/eventlogging-service +++ b/bin/eventlogging-service @@ -53,6 +53,13 @@ 'processes as there are cores. Default 1.', ) +ap.add_argument( + '--error-output', + default=None, + help='URI of output stream for errored events. ' + 'Errored events are written using the EventError schema.' +) + ap.add_argument('output', nargs='+', help='URIs of output streams.') @@ -82,7 +89,8 @@ ) service = EventLoggingService( - args.output + args.output, + args.error_output ) # Start listening for requests. diff --git a/eventlogging/schema.py b/eventlogging/schema.py index 8636c9a..62cc04f 100644 --- a/eventlogging/schema.py +++ b/eventlogging/schema.py @@ -73,6 +73,7 @@ # SCID of the metadata object which wraps each capsule-style event. CAPSULE_SCID = ('EventCapsule', 10981547) +# TODO: Make new meta style EventError on meta. ERROR_SCID = ('EventError', 14035058) # Schemas retrieved via HTTP or files are cached in this dictionary. diff --git a/eventlogging/service.py b/eventlogging/service.py index d54dfd4..d2a4d8e 100644 --- a/eventlogging/service.py +++ b/eventlogging/service.py @@ -21,7 +21,10 @@ from . import ValidationError, SchemaError # these are int __init__.py from .compat import json from .factory import apply_safe, get_writer -from .schema import validate, get_schema, schema_name_from_event +from .schema import ( + validate, get_schema, schema_name_from_event, id_from_event, + create_event_error +) from .topic import ( get_topic_config, scid_for_topic, schema_for_topic, topic_from_event, TopicNotConfigured, TopicNotFound, @@ -45,10 +48,13 @@ until your event is ACKed by Kafka. """ - def __init__(self, writer_uris): + def __init__(self, writer_uris, error_writer_uri=None): """ :param writer_uris: A list of EventLogging writer_uris. Each valid event will be written to each of these writers. + + :param error_writer_uri: If configured, EventErrors will be written + to this writer. """ routes = [ @@ -75,6 +81,13 @@ self.writers[uri] = get_writer(uri) logging.info('Publishing valid JSON events to %s.', uri) + # Errored events will be written to this writer. + if error_writer_uri: + self.error_writer = get_writer(error_writer_uri) + logging.info('Publishing errored events to %s.', error_writer_uri) + else: + self.error_writer = None + def send(self, event): """Send the event to configured eventlogging writers.""" for uri in self.writers.keys(): @@ -93,24 +106,30 @@ self.writers[uri] = w w.send(event) - def process_json( - self, - event_string, - callback=None - ): + def decode_json(self, s, callback=None): """ - Parse the event_string as json and validate it - using the schema configured for this event's topic. - If valid, the event will be sent to all configured writers. + Loads the string into an object. If the top level + object is a dict, it will be wrapped in a list. + The result of this function should be a list of events. - If callback is set, then it is called with the - parsed and validated event object, otherwise - the event object is just returned. + :param s: json string """ + events = json.loads(s.decode('utf-8')) + if isinstance(events, dict): + events = [events] + if callback: + callback(events) + else: + return events - # This will add schema and revision to the event - # based on topic config. - event = json.loads(event_string) + def process_event(self, event): + """ + Validate the event using the schema configured for it's topic. + A valid event will be sent to the configured writers. + + Returns True on success, otherwise some Exception will be thrown. + + """ topic = topic_from_event(event) schema_name, revision = scid_for_topic(topic) @@ -129,15 +148,73 @@ encapsulate = True validate(event, encapsulate=encapsulate) - # Send this processed event to all configured writers - # This will block until each writer finishes writing this event. + # This will block until each writer finishes writing + # this event. self.send(event) + return True + + def handle_events(self, events, callback=None): + """ + Calls process_event on each of the events. Any + errors thrown by process_event will be caught, and EventError + objects will be returned describing the error that the offending + event caused. + + :param events: list of event dicts + """ + # This will add schema and revision to the event + # based on topic config. + + event_errors = [] + for event in events: + error_message = None + + try: + self.process_event(event) + + except TopicNotConfigured as e: + error_message = str(e) + + except TopicNotFound as e: + error_message = 'Could not get topic from event %s. %s' % ( + id_from_event(event), e + ) + + except SchemaError as e: + error_message = 'Could not find schema for provided topic ' \ + 'in event %s. %s' % (id_from_event, e) + + except ValidationError as e: + error_message = 'Failed validating event %s of schema ' \ + '%s. %s' % ( + id_from_event(event), + schema_name_from_event(event), + e.message + ) + + finally: + # If we encountered an error while processing this event, + # log it and create an EventError that will be returned. + if error_message: + logging.error("Failed processing event: %s", error_message) + event_error = create_event_error( + json.dumps(event), + error_message, + # Should we make different error codes for these? + 'validation', + event + ) + event_errors.append(event_error) + # If error_writer is configured, send this + # EventError to it. + if self.error_writer: + self.error_writer.send(event_error) if callback: - callback(event) + callback(event_errors) else: - return event + return event_errors def start(self, port=8085, num_processes=1): """ @@ -151,49 +228,76 @@ class EventHandler(tornado.web.RequestHandler): + @tornado.gen.coroutine def post(self): """ - event_string json string is read in from POST body - and then asynchronously parsed and validated, and then + events_string json string is read in from POST body. + It can be a single event object or a list of event objects. + They will be asynchronously parsed and validated, and then written to configured EventLogging writers. 'topic' - must be set in the event meta data. + must be set in each event's meta data. + + Reponses: + - 201 if all events are accepted. + - 207 if some but not all events are accepted. + - 400 if no events are accepted. + + In case of any errored events, those events will be in the response + body as a JSON list of the form: + [{'event': {...}, 'error': 'String Error Message'}, ... ] + + # TODO: Use EventError and configure an error writer like + eventlogging-processor? """ + response_body = None if self.request.headers['Content-Type'] == 'application/json': try: - body = self.request.body.decode('utf-8') - if body: - event = yield tornado.gen.Task( - self.application.process_json, - body + if self.request.body: + # decode the json string into a list + events = yield tornado.gen.Task( + self.application.decode_json, + self.request.body ) - response_code = 201 - response_text = '%s event on topic %s accepted. ' % ( - schema_name_from_event(event), topic_from_event(event) + # process and validate events + event_errors = yield tornado.gen.Task( + self.application.handle_events, + events ) + events_count = len(events) + event_errors_count = len(event_errors) + + # If all events were accepted, then return 201 + if event_errors_count == 0: + response_code = 201 + response_text = 'All %s events were accepted.' % ( + events_count + ) + else: + # Else if all events failed validation + # return 400 and list of EventErrors. + if events_count == event_errors_count: + response_code = 400 + response_text = ('0 out of %s events were ' + 'accepted.') % events_count + # Else at least 1 event failed validation. + # Return 207 and the list of list of EventErrors. + else: + response_code = 207 + response_text = ('%s out of %s events ' + 'were accepted.') % ( + events_count - event_errors_count, + events_count + ) + response_body = json.dumps(event_errors) else: response_code = 400 response_text = 'Must provide body in request.' + except UnicodeError as e: response_code = 400 response_text = 'UnicodeError while utf-8 decoding ' 'POST body: %s' % e - except TopicNotConfigured as e: - response_code = 404 - response_text = str(e) - except TopicNotFound as e: - response_code = 400 - response_text = 'Must provide topic. %s' % e - except IncorrectSerialization as e: - response_code = 400 - response_text = e.message - except ValidationError as e: - response_code = 400 - response_text = 'Unable to validate event. ' + e.message - except SchemaError as e: - response_code = 500 - response_text = 'Could not find schema for provided ' - 'topic. %s' % e else: response_code = 400 @@ -205,6 +309,8 @@ logging.error(response_text) self.set_status(response_code, response_text) + if response_body: + self.write(response_body) class TopicHandler(tornado.web.RequestHandler): @@ -223,6 +329,7 @@ class SchemaHandler(tornado.web.RequestHandler): + @tornado.gen.coroutine def get(self, schema_name, revision): """ @@ -251,10 +358,7 @@ class TopicConfigHandler(tornado.web.RequestHandler): + def get(self): self.set_status(200) self.write(get_topic_config()) - - -class IncorrectSerialization(Exception): - pass diff --git a/tests/test_service.py b/tests/test_service.py index f16df0d..9770006 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -46,7 +46,7 @@ self.stop, method="POST", body=json.dumps(event), headers=headers) response = self.wait() - self.assertEqual(404, response.code) + self.assertEqual(400, response.code) def test_post_valid_event_configured_topic(self): """ @@ -87,8 +87,6 @@ body=body, headers=headers) response = self.wait() self.assertEqual(400, response.code) - self.assertTrue("'required_field' is a required property" - in response.reason) def test_post_event_missing_optional_field(self): @@ -102,6 +100,54 @@ response = self.wait() self.assertEqual(201, response.code) + def test_post_event_batch(self): + + headers = {'Content-type': 'application/json'} + valid_eventA = copy.deepcopy(self.event_with_meta) + valid_eventB = copy.deepcopy(self.event_with_meta) + events = [valid_eventA, valid_eventB] + body = json.dumps(events) + self.http_client.fetch(self.get_url('/v1/events'), + self.stop, method="POST", + body=body, headers=headers) + response = self.wait() + self.assertEqual(201, response.code) + + def test_post_event_batch_one_invalid(self): + + headers = {'Content-type': 'application/json'} + valid_eventA = copy.deepcopy(self.event_with_meta) + valid_eventB = copy.deepcopy(self.event_with_meta) + # this is supposed to be a string. + valid_eventB['required_field'] = 123 + events = [valid_eventA, valid_eventB] + body = json.dumps(events) + self.http_client.fetch(self.get_url('/v1/events'), + self.stop, method="POST", + body=body, headers=headers) + response = self.wait() + self.assertEqual(207, response.code) + event_errors = json.loads(response.body.decode('utf-8')) + self.assertEqual('validation', event_errors[0]['event']['code']) + + def test_post_event_batch_all_invalid(self): + + headers = {'Content-type': 'application/json'} + valid_eventA = copy.deepcopy(self.event_with_meta) + valid_eventA['required_field'] = 123 + valid_eventB = copy.deepcopy(self.event_with_meta) + valid_eventB['required_field'] = 456 + events = [valid_eventA, valid_eventB] + body = json.dumps(events) + self.http_client.fetch(self.get_url('/v1/events'), + self.stop, method="POST", + body=body, headers=headers) + response = self.wait() + self.assertEqual(400, response.code) + event_errors = json.loads(response.body.decode('utf-8')) + self.assertEqual('validation', event_errors[0]['event']['code']) + self.assertEqual('validation', event_errors[1]['event']['code']) + # Topic Testing def test_list_topics(self): """ -- To view, visit https://gerrit.wikimedia.org/r/256452 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ifdde80e68832d5b3c34ba4ebefbb9e56ff601feb Gerrit-PatchSet: 1 Gerrit-Project: eventlogging Gerrit-Branch: master Gerrit-Owner: Nuria <nu...@wikimedia.org> Gerrit-Reviewer: Ottomata <o...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits