This is an automated email from the ASF dual-hosted git repository.
narro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 7b7875047 feat: pipeline scheduler supports priority (#8534)
7b7875047 is described below
commit 7b7875047b58e51197f85df2fcc26286355529d4
Author: Klesh Wong <[email protected]>
AuthorDate: Thu Aug 14 10:31:23 2025 +0800
feat: pipeline scheduler supports priority (#8534)
---
backend/core/models/blueprint.go | 1 +
.../20250813_add_pipeline_priority.go | 57 ++++++++++++++++++++++
backend/core/models/migrationscripts/register.go | 1 +
backend/core/models/pipeline.go | 2 +
backend/plugins/org/impl/impl.go | 1 +
backend/plugins/org/tasks/sleep.go | 40 +++++++++++++++
backend/plugins/org/tasks/task_data.go | 1 +
backend/server/services/blueprint.go | 1 +
backend/server/services/pipeline.go | 2 +-
backend/server/services/pipeline_helper.go | 1 +
config-ui/.yarnrc.yml | 2 +
config-ui/package.json | 4 ++
12 files changed, 112 insertions(+), 1 deletion(-)
diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index f30adec9f..a9b86d635 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -41,6 +41,7 @@ type Blueprint struct {
AfterPlan PipelinePlan `json:"afterPlan"
gorm:"serializer:encdec"`
Labels []string `json:"labels" gorm:"-"`
Connections []*BlueprintConnection `json:"connections" gorm:"-"`
+ Priority int `json:"priority"` // greater is
higher
SyncPolicy `gorm:"embedded"`
common.Model `swaggerignore:"true"`
}
diff --git
a/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go
b/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go
new file mode 100644
index 000000000..8697fb6a3
--- /dev/null
+++ b/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+var _ plugin.MigrationScript = (*addPipelinePriority)(nil)
+
+type addPipelinePriority struct{}
+
+type blueprint20250813 struct {
+ Priority int `json:"priority"`
+}
+
+func (blueprint20250813) TableName() string {
+ return "_devlake_blueprints"
+}
+
+type pipeline20250813 struct {
+ Priority int `json:"priority"`
+}
+
+func (pipeline20250813) TableName() string {
+ return "_devlake_pipelines"
+}
+
+func (script *addPipelinePriority) Up(basicRes context.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(basicRes,
new(blueprint20250813), new(pipeline20250813))
+}
+
+func (*addPipelinePriority) Version() uint64 {
+ return 20250813151534
+}
+
+func (*addPipelinePriority) Name() string {
+ return "add priority to blueprints and pipelines"
+}
diff --git a/backend/core/models/migrationscripts/register.go
b/backend/core/models/migrationscripts/register.go
index 58c773f46..c14ae8740 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -139,5 +139,6 @@ func All() []plugin.MigrationScript {
new(increaseCqIssueComponentLength),
new(extendFieldSizeForCq),
new(addIssueFixVerion),
+ new(addPipelinePriority),
}
}
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index 2cdc3413f..f9613dd0e 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -66,6 +66,7 @@ type Pipeline struct {
SpentSeconds int `json:"spentSeconds"`
Stage int `json:"stage"`
Labels []string `json:"labels" gorm:"-"`
+ Priority int `json:"priority"` // greater is higher
SyncPolicy `gorm:"embedded"`
}
@@ -75,6 +76,7 @@ type NewPipeline struct {
Name string `json:"name"`
Plan PipelinePlan `json:"plan" swaggertype:"array,string"
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
Labels []string `json:"labels"`
+ Priority int `json:"priority"` // greater is higher
BlueprintId uint64
SyncPolicy `gorm:"embedded"`
}
diff --git a/backend/plugins/org/impl/impl.go b/backend/plugins/org/impl/impl.go
index cd4ffa7fd..e68257eec 100644
--- a/backend/plugins/org/impl/impl.go
+++ b/backend/plugins/org/impl/impl.go
@@ -61,6 +61,7 @@ func (p Org) SubTaskMetas() []plugin.SubTaskMeta {
return []plugin.SubTaskMeta{
tasks.ConnectUserAccountsExactMeta,
tasks.SetProjectMappingMeta,
+ tasks.SleepMeta,
}
}
diff --git a/backend/plugins/org/tasks/sleep.go
b/backend/plugins/org/tasks/sleep.go
new file mode 100644
index 000000000..43c887b1c
--- /dev/null
+++ b/backend/plugins/org/tasks/sleep.go
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "time"
+
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+)
+
+var SleepMeta = plugin.SubTaskMeta{
+ Name: "sleep",
+ EntryPoint: Sleep,
+ EnabledByDefault: false,
+ Description: "for debugging only",
+ DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS},
+}
+
+// SetProjectMapping binds projects and scopes
+func Sleep(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*TaskData)
+ time.Sleep(time.Duration(data.Options.SleepSeconds) * time.Second)
+ return nil
+}
diff --git a/backend/plugins/org/tasks/task_data.go
b/backend/plugins/org/tasks/task_data.go
index ca2a9b300..e3b3cae1c 100644
--- a/backend/plugins/org/tasks/task_data.go
+++ b/backend/plugins/org/tasks/task_data.go
@@ -22,6 +22,7 @@ import "github.com/apache/incubator-devlake/core/plugin"
type Options struct {
ConnectionId uint64 `json:"connectionId"`
ProjectMappings []ProjectMapping `json:"projectMappings"`
+ SleepSeconds uint64 `json:"sleepSeconds"`
}
// ProjectMapping represents the relations between project and scopes
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index 470f8bcdd..91bab6934 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -327,6 +327,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint,
syncPolicy *models.S
newPipeline.Name = blueprint.Name
newPipeline.BlueprintId = blueprint.ID
newPipeline.Labels = blueprint.Labels
+ newPipeline.Priority = blueprint.Priority
newPipeline.SyncPolicy = blueprint.SyncPolicy
// if the plan is empty, we should not create the pipeline
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 413ba3db6..0efccbc3d 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -273,7 +273,7 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
dal.Groupby("id"),
dal.Having("count(_devlake_pipeline_labels.name)=0"),
dal.Select("id"),
- dal.Orderby("id ASC"),
+ dal.Orderby("priority DESC, id ASC"),
dal.Limit(1),
)
if err == nil {
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 0e62bc6c7..d7da0c307 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -67,6 +67,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
Message: "",
SpentSeconds: 0,
Plan: newPipeline.Plan,
+ Priority: newPipeline.Priority,
SyncPolicy: newPipeline.SyncPolicy,
}
if newPipeline.BlueprintId != 0 {
diff --git a/config-ui/.yarnrc.yml b/config-ui/.yarnrc.yml
index 50f1cf9de..9b0fb2d49 100644
--- a/config-ui/.yarnrc.yml
+++ b/config-ui/.yarnrc.yml
@@ -16,4 +16,6 @@
#
nodeLinker: node-modules
+npmRegistryServer: "https://registry.npmmirror.com"
+
yarnPath: .yarn/releases/yarn-3.4.1.cjs
diff --git a/config-ui/package.json b/config-ui/package.json
index 99d10bf19..c10c1c9d9 100644
--- a/config-ui/package.json
+++ b/config-ui/package.json
@@ -69,5 +69,9 @@
"typescript": "^5.1.6",
"vite": "^5.1.4",
"vite-plugin-svgr": "^4.2.0"
+ },
+ "volta": {
+ "node": "18.20.8",
+ "yarn": "3.4.1"
}
}