This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new bdcd2535 Fix cancelling pipeline doesn't support 
pending(TASK_CREATED/TASK_RERUN) records (#3846)
bdcd2535 is described below

commit bdcd2535ed25445340bac34c126a434254487091
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Dec 2 22:20:14 2022 +0800

    Fix cancelling pipeline doesn't support pending(TASK_CREATED/TASK_RERUN) 
records (#3846)
    
    * fix: cancel pipeline doesn't work for pending item
    
    * feat: script for cancelling pipeline
    
    * feat: make PLUGIN env var support multiple plugins
    
    * fix: remove debug code
---
 models/task.go                          |  1 +
 scripts/compile-plugins.sh              | 12 +++++-------
 scripts/pm/framework/pipeline-cancel.sh | 22 ++++++++++++++++++++++
 services/pipeline.go                    | 25 +++++++++++++++++++++++++
 4 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/models/task.go b/models/task.go
index 2f4a85b1..aaca9a99 100644
--- a/models/task.go
+++ b/models/task.go
@@ -31,6 +31,7 @@ const (
        TASK_RUNNING   = "TASK_RUNNING"
        TASK_COMPLETED = "TASK_COMPLETED"
        TASK_FAILED    = "TASK_FAILED"
+       TASK_CANCELLED = "TASK_CANCELLED"
 )
 
 type TaskProgressDetail struct {
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index f14eb74e..b2ea0f06 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -34,8 +34,7 @@ set -e
 
 echo "Usage: "
 echo "  build all plugins:              $0 [golang build flags...]"
-echo "  build and keep one plugin only: PLUGIN=jira $0 [golang build flags...]"
-echo "  build and keep two plugin only: PLUGIN=jira PLUGIN2=github $0 [golang 
build flags...]"
+echo "  build and keep one plugin only: PLUGIN=github,jira $0 [golang build 
flags...]"
 
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
@@ -44,11 +43,10 @@ PLUGIN_OUTPUT_DIR=$SCRIPT_DIR/../bin/plugins
 if [ -z "$PLUGIN" ]; then
     PLUGINS=$(find $PLUGIN_SRC_DIR/* -maxdepth 0 -type d -not -name core -not 
-name helper -not -empty)
 else
-    PLUGINS=$PLUGIN_SRC_DIR/$PLUGIN
-fi
-
-if [ $PLUGIN ] && [ $PLUGIN2 ]; then
-    PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$PLUGIN2"
+    PLUGINS=
+    for p in $(echo "$PLUGIN" | tr "," "\n"); do
+        PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$p"
+    done
 fi
 
 rm -rf $PLUGIN_OUTPUT_DIR/*
diff --git a/scripts/pm/framework/pipeline-cancel.sh 
b/scripts/pm/framework/pipeline-cancel.sh
new file mode 100755
index 00000000..1c1f1bdb
--- /dev/null
+++ b/scripts/pm/framework/pipeline-cancel.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. "$(dirname $0)/../vars/active-vars.sh"
+ID=${1-17}
+
+curl -sv -XDELETE $LAKE_ENDPOINT/pipelines/$ID | jq
diff --git a/services/pipeline.go b/services/pipeline.go
index 52a05d6f..1ba8f2a7 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -348,6 +348,31 @@ func NotifyExternal(pipelineId uint64) errors.Error {
 
 // CancelPipeline FIXME ...
 func CancelPipeline(pipelineId uint64) errors.Error {
+       // prevent RunPipelineInQueue from consuming pending pipelines
+       cronLocker.Lock()
+       defer cronLocker.Unlock()
+       pipeline := &models.DbPipeline{}
+       err := db.First(pipeline, pipelineId).Error
+       if err != nil {
+               return errors.BadInput.New("pipeline not found")
+       }
+       if pipeline.Status == models.TASK_CREATED || pipeline.Status == 
models.TASK_RERUN {
+               pipeline.Status = models.TASK_CANCELLED
+               result := db.Save(pipeline)
+               if result.Error != nil {
+                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline")
+               }
+               // now, with RunPipelineInQueue being block and target pipeline 
got updated
+               // we should update the related tasks as well
+               result = db.Model(&models.Task{}).
+                       Where("pipeline_id = ?", pipelineId).
+                       Update("status", models.TASK_CANCELLED)
+               if result.Error != nil {
+                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline tasks")
+               }
+               // the target pipeline is pending, no running, no need to 
perform the actual cancel operation
+               return nil
+       }
        if temporalClient != nil {
                return 
errors.Convert(temporalClient.CancelWorkflow(context.Background(), 
getTemporalWorkflowId(pipelineId), ""))
        }

Reply via email to