This is an automated email from the ASF dual-hosted git repository.

narro pushed a commit to branch feat-8408
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit c502e5f62b015a7a02b506a3534194e326f207bf
Author: narro wizard <[email protected]>
AuthorDate: Sun Apr 27 01:42:13 2025 +0000

    feat(pipeline): add external notifications for pipeline creation and start
    
    - Send notification when a pipeline is created
    - Send notification when a pipeline starts
    - Notifications are sent asynchronously to avoid delays
    - Error logging is implemented for failed notifications
---
 backend/server/services/pipeline.go        | 9 +++++++++
 backend/server/services/pipeline_helper.go | 8 ++++++++
 2 files changed, 17 insertions(+)

diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index e5e187ac9..5c5fcca62 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -292,6 +292,14 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
                if err != nil {
                        panic(err)
                }
+
+               // Notify that the pipeline has started
+               go func(pipelineId uint64) {
+                       if notifyErr := NotifyExternal(pipelineId); notifyErr 
!= nil {
+                               globalPipelineLog.Error(notifyErr, "failed to 
send pipeline started notification for pipeline #%d", pipelineId)
+                       }
+               }(pipeline.ID)
+
                return
        }
        if tx.IsErrorNotFound(err) {
@@ -363,6 +371,7 @@ func getProjectName(pipeline *models.Pipeline) (string, 
errors.Error) {
        }
        blueprintId := pipeline.BlueprintId
        if blueprintId == 0 {
+               // skip get project name if pipeline is not bound to a blueprint
                return "", nil
        }
        dbBlueprint := &models.Blueprint{}
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index 899110f5c..0e62bc6c7 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -110,6 +110,14 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(pipeline *models.Pipelin
        // update tasks state
        errors.Must(tx.Update(dbPipeline))
        dbPipeline.Labels = newPipeline.Labels
+
+       // Notify that the pipeline has been created
+       go func(pipelineId uint64) {
+               if notifyErr := NotifyExternal(pipelineId); notifyErr != nil {
+                       globalPipelineLog.Error(notifyErr, "failed to send 
pipeline created notification for pipeline #%d", pipelineId)
+               }
+       }(dbPipeline.ID)
+
        return dbPipeline, nil
 }
 

Reply via email to