QChris has uploaded a new change for review.
https://gerrit.wikimedia.org/r/155003
Change subject: Stop scheduling new recurrent runs if databases lag
......................................................................
Stop scheduling new recurrent runs if databases lag
We scheduled recurrent runs regardless of the database's replication
lag. But if there is a replication lag, this would falsify the results
of the runs. Hence, we now don't schedule new recurrent runs if any of
the databases in REPLICATION_LAG_MW_PROJECTS show replication lag.
Those recurrent runs will automatically get scheduled on the next
scheduler run once the database replication caught up.
Since the users we connect to databases, we cannot use MariaDB built
in measures to detect lag. Instead, we check for the most recent edit.
If this edit is older than REPLICATION_LAG_THRESHOLD hours, we
consider the database lagged, otherwise, we assume the replication lag
is fine.
This change does not address replication lag issues for non-recurrent
runs.
Bug: 68507
Change-Id: I31974426ed55f4719b764885e550d98c8c525e4c
---
A tests/stubs/__init__.py
A tests/stubs/always_lagging_replication_lag_service_stub.py
A tests/test_api/test_replication_lag_service.py
A tests/test_api/test_replication_lag_service_factory.py
M tests/test_models/test_run_report.py
M wikimetrics/api/__init__.py
A wikimetrics/api/factories.py
A wikimetrics/api/replication_lag.py
M wikimetrics/config/db_config.yaml
M wikimetrics/schedules/daily.py
10 files changed, 312 insertions(+), 1 deletion(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics
refs/changes/03/155003/1
diff --git a/tests/stubs/__init__.py b/tests/stubs/__init__.py
new file mode 100644
index 0000000..cf268fd
--- /dev/null
+++ b/tests/stubs/__init__.py
@@ -0,0 +1 @@
+from always_lagging_replication_lag_service_stub import
AlwaysLaggingReplicationLagServiceStub
diff --git a/tests/stubs/always_lagging_replication_lag_service_stub.py
b/tests/stubs/always_lagging_replication_lag_service_stub.py
new file mode 100644
index 0000000..62c7d3e
--- /dev/null
+++ b/tests/stubs/always_lagging_replication_lag_service_stub.py
@@ -0,0 +1,7 @@
+from wikimetrics.api import ReplicationLagService
+
+
+class AlwaysLaggingReplicationLagServiceStub(ReplicationLagService):
+ """ReplicationLagService stub that always lags"""
+ def is_any_lagged(self):
+ return True
diff --git a/tests/test_api/test_replication_lag_service.py
b/tests/test_api/test_replication_lag_service.py
new file mode 100644
index 0000000..2c07271
--- /dev/null
+++ b/tests/test_api/test_replication_lag_service.py
@@ -0,0 +1,129 @@
+import unittest
+from datetime import datetime, timedelta
+
+
+from wikimetrics.api import ReplicationLagService
+from wikimetrics.models import Page, Revision, MediawikiUser
+
+from tests.fixtures import DatabaseTest, mediawiki_project,
second_mediawiki_project
+
+
+class ReplicationLagServiceTest(DatabaseTest):
+ """Test case for the database replication lag detection service"""
+
+ def _add_edit(self, hour_offset, name='foo', mw_session=None):
+ """Add an edit at a given hour offset in the past"""
+ if not mw_session:
+ mw_session = self.mwSession
+
+ self.create_test_cohort(
+ name=name,
+ editor_count=1,
+ revisions_per_editor=1,
+ revision_timestamps=datetime.now() - timedelta(hours=hour_offset),
+ revision_lengths=4711,
+ mw_session=mw_session
+ )
+
+ def test_any_lagged_without_wiki(self):
+ service = ReplicationLagService(mw_projects=[])
+
+ self.assertFalse(service.is_any_lagged())
+
+ def test_any_lagged_single_wiki_without_lag(self):
+ self._add_edit(hour_offset=2)
+
+ service = ReplicationLagService(mw_projects=[mediawiki_project])
+
+ self.assertFalse(service.is_any_lagged())
+
+ def test_any_lagged_single_wiki_without_lag_but_older_revisions(self):
+ self._add_edit(hour_offset=4, name='foo')
+ self._add_edit(hour_offset=2, name='bar')
+
+ service = ReplicationLagService(mw_projects=[mediawiki_project])
+
+ self.assertFalse(service.is_any_lagged())
+
+ def test_any_lagged_single_wiki_with_lag(self):
+ self._add_edit(hour_offset=4)
+
+ service = ReplicationLagService(mw_projects=[mediawiki_project])
+
+ self.assertTrue(service.is_any_lagged())
+
+ def test_any_lagged_single_wiki_large_threshold_without_lag(self):
+ self._add_edit(hour_offset=29)
+
+ service = ReplicationLagService(
+ mw_projects=[mediawiki_project],
+ lag_threshold=timedelta(hours=30),
+ )
+
+ self.assertFalse(service.is_any_lagged())
+
+ def test_any_lagged_single_wiki_large_threshold_with_lag(self):
+ self._add_edit(hour_offset=31)
+
+ service = ReplicationLagService(
+ mw_projects=[mediawiki_project],
+ lag_threshold=timedelta(hours=30),
+ )
+
+ self.assertTrue(service.is_any_lagged())
+
+ def test_any_lagged_two_wikis_both_without_lag(self):
+ # Setup of wiki
+ self._add_edit(hour_offset=1, mw_session=self.mwSession)
+
+ # Setup of wiki2
+ self._add_edit(hour_offset=2, mw_session=self.mwSession2)
+
+ service = ReplicationLagService(mw_projects=[
+ mediawiki_project,
+ second_mediawiki_project,
+ ])
+
+ self.assertFalse(service.is_any_lagged())
+
+ def test_any_lagged_two_wikis_both_with_lag(self):
+ # Setup of wiki
+ self._add_edit(hour_offset=4, mw_session=self.mwSession)
+
+ # Setup of wiki2
+ self._add_edit(hour_offset=4, mw_session=self.mwSession2)
+
+ service = ReplicationLagService(mw_projects=[
+ mediawiki_project,
+ second_mediawiki_project,
+ ])
+
+ self.assertTrue(service.is_any_lagged())
+
+ def test_any_lagged_two_wikis_first_with_lag(self):
+ # Setup of wiki
+ self._add_edit(hour_offset=4, mw_session=self.mwSession)
+
+ # Setup of wiki2
+ self._add_edit(hour_offset=1, mw_session=self.mwSession2)
+
+ service = ReplicationLagService(mw_projects=[
+ mediawiki_project,
+ second_mediawiki_project,
+ ])
+
+ self.assertTrue(service.is_any_lagged())
+
+ def test_any_lagged_two_wikis_second_with_lag(self):
+ # Setup of wiki
+ self._add_edit(hour_offset=1, mw_session=self.mwSession)
+
+ # Setup of wiki2
+ self._add_edit(hour_offset=4, mw_session=self.mwSession2)
+
+ service = ReplicationLagService(mw_projects=[
+ mediawiki_project,
+ second_mediawiki_project,
+ ])
+
+ self.assertTrue(service.is_any_lagged())
diff --git a/tests/test_api/test_replication_lag_service_factory.py
b/tests/test_api/test_replication_lag_service_factory.py
new file mode 100644
index 0000000..0d60c8a
--- /dev/null
+++ b/tests/test_api/test_replication_lag_service_factory.py
@@ -0,0 +1,39 @@
+import unittest
+from mock import Mock, NonCallableMock, PropertyMock
+
+from wikimetrics.api import ReplicationLagServiceFactory
+
+
+class ReplicationLagServiceFactoryClassMethodsTest(unittest.TestCase):
+ """Test case for the database replication lag detection service factory"""
+
+ def setUp(self):
+ super(ReplicationLagServiceFactoryClassMethodsTest, self).setUp()
+ self.old_service_class =
ReplicationLagServiceFactory.force_service_class(Mock)
+
+ def tearDown(self):
+
ReplicationLagServiceFactory.force_service_class(self.old_service_class)
+ super(ReplicationLagServiceFactoryClassMethodsTest, self).tearDown()
+
+ def test_create_plain(self):
+ ReplicationLagServiceFactory.force_service_class(Mock)
+ service = ReplicationLagServiceFactory.create()
+ self.assertIsInstance(service, Mock)
+
+ def test_force_service_class(self):
+ # We test for two different mock types. We do not care which ones, they
+ # just need to provide different names, so we can ensure that the
+ # factory picks them up.
+
+ ReplicationLagServiceFactory.force_service_class(NonCallableMock)
+ service = ReplicationLagServiceFactory.create()
+ self.assertEquals(type(service).__name__, "NonCallableMock")
+
+ old_class =
ReplicationLagServiceFactory.force_service_class(PropertyMock)
+ self.assertEquals(old_class.__name__, "NonCallableMock")
+
+ service = ReplicationLagServiceFactory.create()
+ self.assertEquals(type(service).__name__, "PropertyMock")
+
+ old_class =
ReplicationLagServiceFactory.force_service_class(NonCallableMock)
+ self.assertEquals(old_class.__name__, "PropertyMock")
diff --git a/tests/test_models/test_run_report.py
b/tests/test_models/test_run_report.py
index 8263295..b9f9a75 100644
--- a/tests/test_models/test_run_report.py
+++ b/tests/test_models/test_run_report.py
@@ -6,7 +6,9 @@
from mock import MagicMock
from celery.exceptions import SoftTimeLimitExceeded
+from tests.stubs import AlwaysLaggingReplicationLagServiceStub
from tests.fixtures import QueueDatabaseTest, DatabaseTest
+from wikimetrics.api import ReplicationLagServiceFactory
from wikimetrics.models import RunReport, ReportStore, WikiUserStore,
CohortWikiUserStore
from wikimetrics.exceptions import InvalidCohort
from wikimetrics.metrics import metric_classes
@@ -521,7 +523,7 @@
QueueDatabaseTest.setUp(self)
self.common_cohort_1()
- def test_scheduler(self):
+ def inject_and_fetch_recurrent_run(self):
parameters = {
'name': 'Edits - test',
'cohort': {
@@ -552,9 +554,26 @@
.filter(ReportStore.recurrent_parent_id == jr.persistent_id) \
.all()
+ return recurrent_runs
+
+ def test_scheduler(self):
+ recurrent_runs = self.inject_and_fetch_recurrent_run()
+
# make sure we have one and no more than one recurrent run
assert_equals(len(recurrent_runs), 1)
+ def test_scheduler_with_lag(self):
+ old_service_class = ReplicationLagServiceFactory.force_service_class(
+ AlwaysLaggingReplicationLagServiceStub
+ )
+ try:
+ recurrent_runs = self.inject_and_fetch_recurrent_run()
+
+ # make sure we no recurrent run has been scheduled
+ assert_equals(len(recurrent_runs), 0)
+ finally:
+ ReplicationLagServiceFactory.force_service_class(old_service_class)
+
def test_user_id_assigned_properly(self):
parameters = {
'name': 'Bytes - test',
diff --git a/wikimetrics/api/__init__.py b/wikimetrics/api/__init__.py
index b3e735e..a00cefe 100644
--- a/wikimetrics/api/__init__.py
+++ b/wikimetrics/api/__init__.py
@@ -1,6 +1,8 @@
from file_manager import *
from cohorts import *
from tags import *
+from replication_lag import *
+from factories import *
from batch import *
diff --git a/wikimetrics/api/factories.py b/wikimetrics/api/factories.py
new file mode 100644
index 0000000..b6215bc
--- /dev/null
+++ b/wikimetrics/api/factories.py
@@ -0,0 +1,49 @@
+from datetime import datetime, timedelta
+
+from wikimetrics.configurables import db
+from wikimetrics.api import ReplicationLagService
+
+
+class ReplicationLagServiceFactory():
+ """
+ Factory to create ReplicationLagServices
+
+ Factory-created service instances get configured through settings in the
+ database configuration. The following values get used:
+ REPLICATION_LAG_MW_PROJECTS -- list of projects to check. (default:
[])
+ REPLICATION_LAG_THRESHOLD -- int. Consider databases lagged if they
+ are more than that many hours behind. (default: 3)
+ """
+
+ _service_class = ReplicationLagService
+
+ @classmethod
+ def force_service_class(cls, service_class):
+ """
+ Switch the class for the instances generated by this factory.
+
+ Keyword arguments:
+ service_class -- The class to instantiate upon create. This class'
+ constructor has to take at least the following arguments
+ mw_projects -- list of MediaWiki projects to check for lag
+ lag_threshold -- time period. If a database is more than this
+ behind, it is considered lagging.
+
+ Returns the previously set service_class.
+ """
+ old_service_class = cls._service_class
+ cls._service_class = service_class
+ return old_service_class
+
+ @classmethod
+ def create(cls):
+ """Create a new instance"""
+ mw_projects = db.config.get('REPLICATION_LAG_MW_PROJECTS', [])
+ lag_threshold_int = db.config.get('REPLICATION_LAG_THRESHOLD', 3)
+
+ lag_threshold = timedelta(hours=lag_threshold_int)
+
+ return cls._service_class(
+ mw_projects=mw_projects,
+ lag_threshold=lag_threshold
+ )
diff --git a/wikimetrics/api/replication_lag.py
b/wikimetrics/api/replication_lag.py
new file mode 100644
index 0000000..0a8ed16
--- /dev/null
+++ b/wikimetrics/api/replication_lag.py
@@ -0,0 +1,54 @@
+from datetime import datetime, timedelta
+
+from wikimetrics.configurables import db
+from wikimetrics.models import Revision
+
+
+class ReplicationLagService():
+ """Service to check replication lag of databases"""
+
+ def __init__(self, mw_projects=[], lag_threshold=timedelta(hours=3)):
+ """
+ Construct service to check database replication lag
+
+ Keyword arguments:
+ mw_projects -- List of MediaWiki projects to check against repliaction
+ lag. (default: [])
+ lag_threshold -- time period. If the most recent edit in a wiki is
older
+ than this time period, the wiki's database is considered to be
+ lagging. (default: 3 hours)
+ """
+ self._mw_projects = mw_projects
+ self._lag_threshold = lag_threshold
+
+ def _is_mw_project_lagged(self, mw_project):
+ """
+ Determines whether or not the given wiki is considered
+ lagged or not.
+
+ Keyword arguments:
+ mw_project -- Name of the wiki to check
+ """
+ session = db.get_mw_session(mw_project)
+ try:
+ timestamp = session.query(Revision)\
+ .order_by(Revision.rev_timestamp.desc())\
+ .limit(1)\
+ .one()\
+ .rev_timestamp
+ finally:
+ session.close()
+
+ assert(type(timestamp) is datetime)
+
+ return timestamp < datetime.utcnow() - self._lag_threshold
+
+ def is_any_lagged(self):
+ """
+ Determines whether or not any of the default projects are
+ considered lagged or not.
+ """
+ for mw_project in self._mw_projects:
+ if self._is_mw_project_lagged(mw_project):
+ return True
+ return False
diff --git a/wikimetrics/config/db_config.yaml
b/wikimetrics/config/db_config.yaml
index acfed7c..1779181 100644
--- a/wikimetrics/config/db_config.yaml
+++ b/wikimetrics/config/db_config.yaml
@@ -6,3 +6,5 @@
MEDIAWIKI_POOL_SIZE : 32
DEBUG : True
REVISION_TABLENAME : 'revision_userindex'
+REPLICATION_LAG_MW_PROJECTS : [] # empty, so inactive test wikis don't
block us
+REPLICATION_LAG_THRESHOLD : 3 # (measured in hours)
diff --git a/wikimetrics/schedules/daily.py b/wikimetrics/schedules/daily.py
index 943cae9..f1c7341 100644
--- a/wikimetrics/schedules/daily.py
+++ b/wikimetrics/schedules/daily.py
@@ -3,6 +3,7 @@
from celery import group, chain
from celery.utils.log import get_task_logger
+from wikimetrics.api import ReplicationLagServiceFactory
from wikimetrics.configurables import queue
from wikimetrics.utils import chunk
@@ -15,6 +16,14 @@
from wikimetrics.configurables import db
from wikimetrics.models import ReportStore, RunReport
+ replication_lag_service = ReplicationLagServiceFactory.create()
+ if replication_lag_service.is_any_lagged():
+ task_logger.warning(
+ 'Replication lag detected. '
+ 'Hence, skipping creating new recurring reports.'
+ )
+ return
+
try:
session = db.get_session()
query = session.query(ReportStore) \
--
To view, visit https://gerrit.wikimedia.org/r/155003
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I31974426ed55f4719b764885e550d98c8c525e4c
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits