This is an automated email from the ASF dual-hosted git repository.
narro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 30960f3d6 feat(pipeline): add external notifications for pipeline
creation and start (#8431)
30960f3d6 is described below
commit 30960f3d617339a61e4e3313ce0124b5886d6857
Author: NaRro <[email protected]>
AuthorDate: Fri May 9 01:34:07 2025 +0000
feat(pipeline): add external notifications for pipeline creation and start
(#8431)
- Send notification when a pipeline is created
- Send notification when a pipeline starts
- Notifications are sent asynchronously to avoid delays
- Error logging is implemented for failed notifications
---
backend/server/services/pipeline.go | 9 +++++++++
backend/server/services/pipeline_helper.go | 8 ++++++++
2 files changed, 17 insertions(+)
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index e5e187ac9..5c5fcca62 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -292,6 +292,14 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
if err != nil {
panic(err)
}
+
+ // Notify that the pipeline has started
+ go func(pipelineId uint64) {
+ if notifyErr := NotifyExternal(pipelineId); notifyErr
!= nil {
+ globalPipelineLog.Error(notifyErr, "failed to
send pipeline started notification for pipeline #%d", pipelineId)
+ }
+ }(pipeline.ID)
+
return
}
if tx.IsErrorNotFound(err) {
@@ -363,6 +371,7 @@ func getProjectName(pipeline *models.Pipeline) (string,
errors.Error) {
}
blueprintId := pipeline.BlueprintId
if blueprintId == 0 {
+ // skip get project name if pipeline is not bound to a blueprint
return "", nil
}
dbBlueprint := &models.Blueprint{}
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 899110f5c..0e62bc6c7 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -110,6 +110,14 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
// update tasks state
errors.Must(tx.Update(dbPipeline))
dbPipeline.Labels = newPipeline.Labels
+
+ // Notify that the pipeline has been created
+ go func(pipelineId uint64) {
+ if notifyErr := NotifyExternal(pipelineId); notifyErr != nil {
+ globalPipelineLog.Error(notifyErr, "failed to send
pipeline created notification for pipeline #%d", pipelineId)
+ }
+ }(dbPipeline.ID)
+
return dbPipeline, nil
}