Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/258217

Change subject: Fix bad push/pop from event batch deque
......................................................................

Fix bad push/pop from event batch deque

Former code was pushing/popping event batches to the same end
of the deque. Also, changed the batch size and the deque size
to avoid out-of-memory errors when backfilling or when having
kafka hiccups, now that we have 4 mysql consumers in parallel

Bug: T120209
Change-Id: I086753a504462396520f768934b7c038f1874bd6
---
M eventlogging/handlers.py
M eventlogging/jrm.py
M tests/test_jrm.py
3 files changed, 12 insertions(+), 11 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/eventlogging 
refs/changes/17/258217/1

diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index a7c8993..79bf236 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -304,10 +304,10 @@
             # that the connection is alive, and reconnect if necessary.
             dbapi_connection.ping(True)
     try:
-        batch_size = 5000
+        batch_size = 3000
         batch_time = 300  # in seconds
         # Max number of batches pending insertion.
-        queue_size = 1000
+        queue_size = 100
         sleep_seconds = 5
         # Link the main thread to the worker thread so we
         # don't keep filling the queue if the worker died.
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 20234f4..27c6d12 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -242,7 +242,7 @@
         insert = _insert_sequential
 
     while len(events_batch) > 0:
-        scid, scid_events = events_batch.pop()
+        scid, scid_events = events_batch.popleft()
         prepared_events = [prepare(e) for e in scid_events]
         # TODO: Avoid breaking the inserts down by same set of fields,
         # instead force a default NULL, 0 or '' value for optional fields.
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 75cfc60..5b7d408 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -7,6 +7,7 @@
 
 """
 from __future__ import unicode_literals
+from collections import deque
 
 import datetime
 import itertools
@@ -26,7 +27,7 @@
         """If an attempt is made to store an event for which no table
         exists, the schema is automatically retrieved and a suitable
         table generated."""
-        events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
         eventlogging.store_sql_events(self.meta, events_batch)
         self.assertIn('TestSchema_123', self.meta.tables)
         table = self.meta.tables['TestSchema_123']
@@ -61,7 +62,7 @@
 
     def test_encoding(self):
         """Timestamps and unicode strings are correctly encoded."""
-        events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
         eventlogging.jrm.store_sql_events(self.meta, events_batch)
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
@@ -75,7 +76,7 @@
     def test_reflection(self):
         """Tables which exist in the database but not in the MetaData cache are
         correctly reflected."""
-        events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
         eventlogging.store_sql_events(self.meta, events_batch)
 
         # Tell Python to forget everything it knows about this database
@@ -91,14 +92,14 @@
         # The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
         # will ensure that we don't attempt to CREATE TABLE on the
         # already-existing table:
-        events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
         eventlogging.store_sql_events(self.meta, events_batch, True)
         self.assertIn('TestSchema_123', self.meta.tables)
 
     def test_happy_case_insert_more_than_one_event(self):
         """Insert more than one event on database using batch_write"""
         another_event = next(self.event_generator)
-        events_batch = [(TEST_SCHEMA_SCID, [another_event, self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
         eventlogging.store_sql_events(self.meta, events_batch)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
@@ -114,12 +115,12 @@
         insert the other items.
         """
         # insert event
-        events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
         eventlogging.jrm.store_sql_events(self.meta, events_batch)
         # now try to insert list of events in which this event is included
         another_event = next(self.event_generator)
         event_list = [another_event, self.event]
-        events_batch = [(TEST_SCHEMA_SCID, event_list)]
+        events_batch = deque([(TEST_SCHEMA_SCID, event_list)])
         eventlogging.store_sql_events(self.meta, events_batch, replace=True)
 
         # we should still have to insert the other record though
@@ -158,7 +159,7 @@
         set of optional fields are inserted correctly"""
         another_event = next(self.event_generator)
         # ensure both events get inserted?
-        events_batch = [(TEST_SCHEMA_SCID, [another_event, self.event])]
+        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
         eventlogging.store_sql_events(self.meta, events_batch)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?

-- 
To view, visit https://gerrit.wikimedia.org/r/258217
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I086753a504462396520f768934b7c038f1874bd6
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to