Milimetric has submitted this change and it was merged. Change subject: Removing usage of celery chains from report scheduling ......................................................................
Removing usage of celery chains from report scheduling Removing usage of chains had additional benefits like being able to use CELERY_ALWAYS_EAGER on paralelilsm tests and being able to remove sleeps and recursion limts. Goes with corresponding puppet change: https://gerrit.wikimedia.org/r/153390 Tested rolling active editor on eswiki on dev. Wrote some benchmarks on wiki below, please be aware that numbers vary greatly what are important are orders of magnitude. http://www.mediawiki.org/wiki/Analytics/Editor_Engagement_Vital_Signs/Backfilling Bug: 68840 Change-Id: Ie4432d72a600450203395052e2bdfab348285882 --- M tests/manual/parallel_reports.py M wikimetrics/config/queue_config.yaml M wikimetrics/schedules/daily.py 3 files changed, 7 insertions(+), 35 deletions(-) Approvals: Milimetric: Looks good to me, approved diff --git a/tests/manual/parallel_reports.py b/tests/manual/parallel_reports.py index 446040e..1ec598d 100644 --- a/tests/manual/parallel_reports.py +++ b/tests/manual/parallel_reports.py @@ -29,25 +29,16 @@ # total_runs = 200 and parallelism = 2 # no longer causes maximum recursion # - # sleep = 10 was enough time on Dan's machine to make all cases work self.total_runs = 200 - self.parallelism = 2 self.sleep = 10 # crank up the queue parallel report configuration - self.save_parallelism = queue.conf['MAX_PARALLEL_PER_RUN'] self.save_instances = queue.conf['MAX_INSTANCES_PER_RECURRENT_REPORT'] - self.save_eager = queue.conf['CELERY_ALWAYS_EAGER'] - queue.conf['MAX_PARALLEL_PER_RUN'] = self.parallelism queue.conf['MAX_INSTANCES_PER_RECURRENT_REPORT'] = self.total_runs - queue.conf['CELERY_ALWAYS_EAGER'] = False def tearDown(self): - # re-enable the scheduler after these tests - queue.conf['MAX_PARALLEL_PER_RUN'] = self.save_parallelism queue.conf['MAX_INSTANCES_PER_RECURRENT_REPORT'] = self.save_instances - queue.conf['CELERY_ALWAYS_EAGER'] = self.save_eager DatabaseTest.tearDown(self) @attr('manual') @@ -82,8 +73,6 @@ # executing directly the code that will be run by the scheduler recurring_reports() - - time.sleep(self.sleep) recurrent_runs = self.session.query(ReportStore) \ .filter(ReportStore.recurrent_parent_id == jr.persistent_id) \ diff --git a/wikimetrics/config/queue_config.yaml b/wikimetrics/config/queue_config.yaml index d79092a..20b51c2 100644 --- a/wikimetrics/config/queue_config.yaml +++ b/wikimetrics/config/queue_config.yaml @@ -4,12 +4,11 @@ CELERY_TASK_RESULT_EXPIRES : 3600 CELERY_DISABLE_RATE_LIMITS : True CELERY_STORE_ERRORS_EVEN_IF_IGNORED : True -CELERYD_CONCURRENCY : 16 +CELERYD_CONCURRENCY : 10 CELERYD_TASK_TIME_LIMIT : 3630 CELERYD_TASK_SOFT_TIME_LIMIT : 3600 DEBUG : True LOG_LEVEL : 'DEBUG' -MAX_PARALLEL_PER_RUN : 10 MAX_INSTANCES_PER_RECURRENT_REPORT : 100 CELERY_BEAT_DATAFILE : './generated/scheduled_tasks' CELERY_BEAT_PIDFILE : './generated/celerybeat.pid' diff --git a/wikimetrics/schedules/daily.py b/wikimetrics/schedules/daily.py index 7dd01cb..943cae9 100644 --- a/wikimetrics/schedules/daily.py +++ b/wikimetrics/schedules/daily.py @@ -10,15 +10,6 @@ task_logger = get_task_logger(__name__) -""" -TODO: we tried to set the recursion limit more dynamically and reset it when done, - but apparently sys.setrecursionlimit has to happen at import time. So we can - either disprove that theory or stop depending on pickle for serialization to - remove this nasty hack. -""" -sys.setrecursionlimit(100000) - - @queue.task() def recurring_reports(report_id=None): from wikimetrics.configurables import db @@ -32,8 +23,6 @@ if report_id is not None: query = query.filter(ReportStore.id == report_id) - parallelism = int(queue.conf.get('MAX_PARALLEL_PER_RUN')) - new_report_runs = [] for report in query.all(): try: task_logger.info('Running recurring report "{0}"'.format(report)) @@ -41,24 +30,19 @@ kwargs = dict() if no_more_than: kwargs['no_more_than'] = no_more_than - new_report_runs += list(RunReport.create_reports_for_missed_days( + + days_to_run = RunReport.create_reports_for_missed_days( report, session, **kwargs - )) + ) + for day_to_run in days_to_run: + day_to_run.task.delay(day_to_run) + except Exception: task_logger.error('Problem running recurring report "{}": {}'.format( report, traceback.format_exc() )) - - # WARNING regarding testing this code: - # Wet set CELERY_ALWAYS_EAGER on the queue instance - # to run tasks synchronously whith test process. - # Looks like celery chains do not work with the 'eager' setting. - # Tests exit 'too fast' without having executed the chain code - groups = chunk(new_report_runs, parallelism) - chain_of_groups = chain([group([r.task.s(r) for r in g]) for g in groups]) - chain_of_groups.delay() except Exception: task_logger.error('Problem running recurring reports: {}'.format( -- To view, visit https://gerrit.wikimedia.org/r/150475 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie4432d72a600450203395052e2bdfab348285882 Gerrit-PatchSet: 9 Gerrit-Project: analytics/wikimetrics Gerrit-Branch: master Gerrit-Owner: Milimetric <[email protected]> Gerrit-Reviewer: Milimetric <[email protected]> Gerrit-Reviewer: Nuria <[email protected]> Gerrit-Reviewer: QChris <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
