This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new bdcd2535 Fix cancelling pipeline doesn't support
pending(TASK_CREATED/TASK_RERUN) records (#3846)
bdcd2535 is described below
commit bdcd2535ed25445340bac34c126a434254487091
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Dec 2 22:20:14 2022 +0800
Fix cancelling pipeline doesn't support pending(TASK_CREATED/TASK_RERUN)
records (#3846)
* fix: cancel pipeline doesn't work for pending item
* feat: script for cancelling pipeline
* feat: make PLUGIN env var support multiple plugins
* fix: remove debug code
---
models/task.go | 1 +
scripts/compile-plugins.sh | 12 +++++-------
scripts/pm/framework/pipeline-cancel.sh | 22 ++++++++++++++++++++++
services/pipeline.go | 25 +++++++++++++++++++++++++
4 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/models/task.go b/models/task.go
index 2f4a85b1..aaca9a99 100644
--- a/models/task.go
+++ b/models/task.go
@@ -31,6 +31,7 @@ const (
TASK_RUNNING = "TASK_RUNNING"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
+ TASK_CANCELLED = "TASK_CANCELLED"
)
type TaskProgressDetail struct {
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index f14eb74e..b2ea0f06 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -34,8 +34,7 @@ set -e
echo "Usage: "
echo " build all plugins: $0 [golang build flags...]"
-echo " build and keep one plugin only: PLUGIN=jira $0 [golang build flags...]"
-echo " build and keep two plugin only: PLUGIN=jira PLUGIN2=github $0 [golang
build flags...]"
+echo " build and keep one plugin only: PLUGIN=github,jira $0 [golang build
flags...]"
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
@@ -44,11 +43,10 @@ PLUGIN_OUTPUT_DIR=$SCRIPT_DIR/../bin/plugins
if [ -z "$PLUGIN" ]; then
PLUGINS=$(find $PLUGIN_SRC_DIR/* -maxdepth 0 -type d -not -name core -not
-name helper -not -empty)
else
- PLUGINS=$PLUGIN_SRC_DIR/$PLUGIN
-fi
-
-if [ $PLUGIN ] && [ $PLUGIN2 ]; then
- PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$PLUGIN2"
+ PLUGINS=
+ for p in $(echo "$PLUGIN" | tr "," "\n"); do
+ PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$p"
+ done
fi
rm -rf $PLUGIN_OUTPUT_DIR/*
diff --git a/scripts/pm/framework/pipeline-cancel.sh
b/scripts/pm/framework/pipeline-cancel.sh
new file mode 100755
index 00000000..1c1f1bdb
--- /dev/null
+++ b/scripts/pm/framework/pipeline-cancel.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. "$(dirname $0)/../vars/active-vars.sh"
+ID=${1-17}
+
+curl -sv -XDELETE $LAKE_ENDPOINT/pipelines/$ID | jq
diff --git a/services/pipeline.go b/services/pipeline.go
index 52a05d6f..1ba8f2a7 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -348,6 +348,31 @@ func NotifyExternal(pipelineId uint64) errors.Error {
// CancelPipeline FIXME ...
func CancelPipeline(pipelineId uint64) errors.Error {
+ // prevent RunPipelineInQueue from consuming pending pipelines
+ cronLocker.Lock()
+ defer cronLocker.Unlock()
+ pipeline := &models.DbPipeline{}
+ err := db.First(pipeline, pipelineId).Error
+ if err != nil {
+ return errors.BadInput.New("pipeline not found")
+ }
+ if pipeline.Status == models.TASK_CREATED || pipeline.Status ==
models.TASK_RERUN {
+ pipeline.Status = models.TASK_CANCELLED
+ result := db.Save(pipeline)
+ if result.Error != nil {
+ return errors.Default.Wrap(result.Error, "faile to
update pipeline")
+ }
+ // now, with RunPipelineInQueue being block and target pipeline
got updated
+ // we should update the related tasks as well
+ result = db.Model(&models.Task{}).
+ Where("pipeline_id = ?", pipelineId).
+ Update("status", models.TASK_CANCELLED)
+ if result.Error != nil {
+ return errors.Default.Wrap(result.Error, "faile to
update pipeline tasks")
+ }
+ // the target pipeline is pending, no running, no need to
perform the actual cancel operation
+ return nil
+ }
if temporalClient != nil {
return
errors.Convert(temporalClient.CancelWorkflow(context.Background(),
getTemporalWorkflowId(pipelineId), ""))
}