This is an automated email from the ASF dual-hosted git repository.
narro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 6e224fa41 fix: high priority pipelines could be starved by `parallel/`
label (#8568)
6e224fa41 is described below
commit 6e224fa41ce5abcfe3cc4567531a6ef878ff423e
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Sep 22 08:47:36 2025 +0800
fix: high priority pipelines could be starved by `parallel/` label (#8568)
* fix: high priority pipelines could be starved by `parallel/` label
* fix: failed on mysql
---
backend/server/services/pipeline.go | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 78e7df085..f6c770f9c 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -261,8 +261,21 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
}))
// prepare query to find an appropriate pipeline to execute
pipeline = &models.Pipeline{}
+ // 1. find out the current highest priority in the queue
+ top_priority := 0
+ var top_priorities []int
+ where_status := dal.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN, models.TASK_RESUME})
+ err = tx.Pluck("priority", &top_priorities, dal.From(pipeline),
where_status, dal.Orderby("priority DESC"), dal.Limit(1))
+ if err != nil {
+ panic(err)
+ }
+ if len(top_priorities) > 0 {
+ top_priority = top_priorities[0]
+ }
+ // 2. pick the earlier runnable pipeline with the highest priority
err = tx.First(pipeline,
- dal.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN, models.TASK_RESUME}),
+ where_status,
+ dal.Where("priority = ?", top_priority),
dal.Join(
`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id =
_devlake_pipelines.id AND
@@ -270,10 +283,10 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
_devlake_pipeline_labels.name in ?`,
runningParallelLabels,
),
- dal.Groupby("priority, id"),
+ dal.Groupby("id"),
dal.Having("count(_devlake_pipeline_labels.name)=0"),
dal.Select("id"),
- dal.Orderby("priority DESC, id ASC"),
+ dal.Orderby("id ASC"),
dal.Limit(1),
)
if err == nil {