Colin Watson has proposed merging ~cjwatson/launchpad:stormify-queuedepth-queries into launchpad:master.
Commit message: Convert lp.buildmaster.queuedepth queries to Storm expressions Requested reviews: Launchpad code reviewers (launchpad-reviewers) For more details, see: https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/446807 We generally prefer having less in the way of constructing SQL queries by pasting strings together. This will also make it a little easier to convert `Processor` to Storm. Most of the transformations are reasonably obvious; I switched from `EXTRACT(EPOCH FROM ...)` to `date_part('epoch', ...)` because the latter can be expressed in Storm expressions more conveniently, but https://www.postgresql.org/docs/10/functions-datetime.html says that they're equivalent. -- Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-queuedepth-queries into launchpad:master.
diff --git a/lib/lp/buildmaster/queuedepth.py b/lib/lp/buildmaster/queuedepth.py index 05adfa1..354d07e 100644 --- a/lib/lp/buildmaster/queuedepth.py +++ b/lib/lp/buildmaster/queuedepth.py @@ -8,13 +8,26 @@ __all__ = [ from collections import defaultdict from datetime import datetime, timedelta, timezone -from storm.expr import Count +from storm.databases.postgres import Case +from storm.expr import ( + And, + Cast, + Count, + Desc, + Func, + Is, + IsNot, + Min, + Not, + Or, + Select, + Sum, +) from lp.buildmaster.enums import BuildQueueStatus from lp.buildmaster.model.builder import Builder, BuilderProcessor from lp.buildmaster.model.buildqueue import BuildQueue from lp.services.database.interfaces import IStore -from lp.services.database.sqlbase import sqlvalues def get_builder_data(): @@ -56,26 +69,29 @@ def get_builder_data(): def get_free_builders_count(processor, virtualized): """How many builders capable of running jobs for the given processor and virtualization combination are idle/free at present?""" - query = """ - SELECT COUNT(id) FROM builder - WHERE - builderok = TRUE AND manual = FALSE - AND id NOT IN ( - SELECT builder FROM BuildQueue WHERE builder IS NOT NULL) - AND virtualized = %s - """ % sqlvalues( - virtualized - ) + clauses = [ + Is(Builder._builderok, True), + Is(Builder.manual, False), + Not( + Builder.id.is_in( + Select( + BuildQueue.builder_id, + where=IsNot(BuildQueue.builder_id, None), + ) + ) + ), + Is(Builder.virtualized, virtualized), + ] if processor is not None: - query += """ - AND id IN ( - SELECT builder FROM BuilderProcessor WHERE processor = %s) - """ % sqlvalues( - processor + clauses.append( + Builder.id.is_in( + Select( + BuilderProcessor.builder_id, + where=(BuilderProcessor.processor == processor), + ) + ) ) - result_set = IStore(BuildQueue).execute(query) - free_builders = result_set.get_one()[0] - return free_builders + return IStore(Builder).find(Builder.id, *clauses).count() def get_head_job_platform(bq): @@ -89,18 +105,15 @@ def get_head_job_platform(bq): platform or None if the JOI is the head job. """ my_platform = (getattr(bq.processor, "id", None), bq.virtualized) - query = """ - SELECT - processor, - virtualized - FROM BuildQueue - WHERE - """ - query += get_pending_jobs_clauses(bq) - query += """ - ORDER BY lastscore DESC, id LIMIT 1 - """ - result = IStore(BuildQueue).execute(query).get_one() + result = ( + IStore(BuildQueue) + .find( + (BuildQueue.processor_id, BuildQueue.virtualized), + *get_pending_jobs_clauses(bq), + ) + .order_by(Desc(BuildQueue.lastscore), BuildQueue.id) + .first() + ) return my_platform if result is None else result @@ -132,84 +145,77 @@ def estimate_time_to_next_builder(bq, now=None): head_job_processor, head_job_virtualized = head_job_platform now = now or datetime.now(timezone.utc) - delay_query = """ - SELECT MIN( - CASE WHEN - EXTRACT(EPOCH FROM - (BuildQueue.estimated_duration - - (((%s AT TIME ZONE 'UTC') - BuildQueue.date_started)))) >= 0 - THEN - EXTRACT(EPOCH FROM - (BuildQueue.estimated_duration - - (((%s AT TIME ZONE 'UTC') - BuildQueue.date_started)))) - ELSE - -- Assume that jobs that have overdrawn their estimated - -- duration time budget will complete within 2 minutes. - -- This is a wild guess but has worked well so far. - -- - -- Please note that this is entirely innocuous i.e. if our - -- guess is off nothing bad will happen but our estimate will - -- not be as good as it could be. - 120 - END) - FROM - BuildQueue, Builder - WHERE - BuildQueue.builder = Builder.id - AND Builder.manual = False - AND Builder.builderok = True - AND BuildQueue.status = %s - AND Builder.virtualized = %s - """ % sqlvalues( - now, now, BuildQueueStatus.RUNNING, head_job_virtualized - ) - + delay_clauses = [ + BuildQueue.builder_id == Builder.id, + Is(Builder.manual, False), + Is(Builder._builderok, True), + BuildQueue.status == BuildQueueStatus.RUNNING, + Is(Builder.virtualized, head_job_virtualized), + ] if head_job_processor is not None: # Only look at builders with specific processor types. - delay_query += """ - AND Builder.id IN ( - SELECT builder FROM BuilderProcessor WHERE processor = %s) - """ % sqlvalues( - head_job_processor + delay_clauses.append( + Builder.id.is_in( + Select( + BuilderProcessor.builder_id, + where=(BuilderProcessor.processor == head_job_processor), + ) + ) + ) + estimated_remaining = Func( + "date_part", + "epoch", + BuildQueue.date_started + BuildQueue.estimated_duration - now, + ) + head_job_delay = ( + IStore(BuildQueue) + .find( + Min( + Case( + cases=((estimated_remaining >= 0, estimated_remaining),), + # Assume that jobs that have overdrawn their estimated + # duration time budget will complete within 2 minutes. + # This is a wild guess but has worked well so far. + # + # Please note that this is entirely innocuous, i.e. if + # our guess is off nothing bad will happen but our + # estimate will not be as good as it could be. + default=120, + ) + ), + *delay_clauses, ) + .one() + ) - result_set = IStore(BuildQueue).execute(delay_query) - head_job_delay = result_set.get_one()[0] return 0 if head_job_delay is None else int(head_job_delay) def get_pending_jobs_clauses(bq): - """WHERE clauses for pending job queries, used for dipatch time - estimation.""" - clauses = """ - BuildQueue.status = %s - AND ( - -- The score must be either above my score or the - -- job must be older than me in cases where the - -- score is equal. - BuildQueue.lastscore > %s OR - (BuildQueue.lastscore = %s AND BuildQueue.id < %s)) - AND buildqueue.virtualized = %s - """ % sqlvalues( - BuildQueueStatus.WAITING, - bq.lastscore, - bq.lastscore, - bq.id, - bq.virtualized, - ) - processor_clause = """ - AND ( - -- The processor values either match or the candidate - -- job is processor-independent. - buildqueue.processor = %s OR - buildqueue.processor IS NULL) - """ % sqlvalues( - bq.processor - ) + """Storm clauses for pending job queries, used for dispatch time + estimation. + """ + clauses = [ + BuildQueue.status == BuildQueueStatus.WAITING, + # Either the score must be above my score, or the job must be older + # than me in cases where the score is equal. + Or( + BuildQueue.lastscore > bq.lastscore, + And(BuildQueue.lastscore == bq.lastscore, BuildQueue.id < bq.id), + ), + Is(BuildQueue.virtualized, bq.virtualized), + ] # We don't care about processors if the estimation is for a # processor-independent job. if bq.processor is not None: - clauses += processor_clause + # Either the processor values match, or the candidate job is + # processor-independent. + clauses.append( + Or( + BuildQueue.processor == bq.processor, + Is(BuildQueue.processor_id, None), + ) + ) return clauses @@ -247,23 +253,26 @@ def estimate_job_delay(bq, builder_stats): return a == b my_platform = (getattr(bq.processor, "id", None), bq.virtualized) - query = """ - SELECT - BuildQueue.processor, - BuildQueue.virtualized, - COUNT(BuildQueue.id), - CAST(EXTRACT( - EPOCH FROM - SUM(BuildQueue.estimated_duration)) AS INTEGER) - FROM BuildQueue - WHERE - """ - query += get_pending_jobs_clauses(bq) - query += """ - GROUP BY BuildQueue.processor, BuildQueue.virtualized - """ - - delays_by_platform = IStore(BuildQueue).execute(query).get_all() + delays_by_platform = ( + IStore(BuildQueue) + .find( + ( + BuildQueue.processor_id, + BuildQueue.virtualized, + Count(BuildQueue.id), + Cast( + Func( + "date_part", + "epoch", + Sum(BuildQueue.estimated_duration), + ), + "integer", + ), + ), + *get_pending_jobs_clauses(bq), + ) + .group_by(BuildQueue.processor_id, BuildQueue.virtualized) + ) # This will be used to capture per-platform delay totals. delays = defaultdict(int)
_______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : launchpad-reviewers@lists.launchpad.net Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp