This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git
The following commit(s) were added to refs/heads/main by this push:
new f0ea73c3c95 kie-tools-2910: Update workflow metadata upon deployment
events (#2911)
f0ea73c3c95 is described below
commit f0ea73c3c95e5546be5aa5ddfb83c27fc14a8558
Author: Walter Medvedeo <[email protected]>
AuthorDate: Tue Mar 25 16:48:20 2025 +0100
kie-tools-2910: Update workflow metadata upon deployment events (#2911)
---
.../api/v1alpha08/sonataflow_types.go | 8 ++
.../api/v1alpha08/zz_generated.deepcopy.go | 8 ++
packages/sonataflow-operator/cmd/main.go | 7 ++
.../sonataflow-operator.clusterserviceversion.yaml | 8 ++
packages/sonataflow-operator/go.mod | 2 +-
.../module.yaml | 4 +
.../internal/controller/builder/builder.go | 2 +-
.../builder/kogitoserverlessbuild_manager.go | 2 +-
.../controller/eventing/workflowdef_events.go | 73 +++++++++++
.../internal/controller/knative/knative.go | 24 +++-
.../internal/controller/platform/platform.go | 42 ++++---
.../controller/platform/services/services.go | 5 +-
.../profiles/common/constants/reconcile.go | 11 +-
.../profiles/common/constants/workflows.go | 30 ++---
.../profiles/common/properties/managed.go | 24 +++-
.../controller/profiles/common/reconciler.go | 2 +-
.../internal/controller/profiles/dev/states_dev.go | 2 +-
.../profiles/preview/deployment_handler.go | 85 ++++++++++++-
.../profiles/preview/deployment_handler_test.go | 4 +
.../controller/profiles/preview/states_preview.go | 4 +-
.../internal/controller/sonataflow_controller.go | 136 +++++++++++++++++++--
.../internal/controller/workflowdef/events.go | 53 ++++++++
.../sonataflow-operator/internal/manager/worker.go | 77 ++++++++++++
packages/sonataflow-operator/operator.yaml | 10 ++
packages/sonataflow-operator/utils/events.go | 65 ++++++++++
25 files changed, 627 insertions(+), 61 deletions(-)
diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
index 7b6091eaf3e..f43d8a11bd2 100644
--- a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
@@ -202,6 +202,14 @@ type SonataFlowStatus struct {
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
FlowCRC uint32 `json:"flowCRC,omitempty"`
+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="finalizerAttempts"
+ FinalizerAttempts int `json:"finalizerAttempts,omitempty"`
+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="finalizerSucceed"
+ FinalizerSucceed bool `json:"finalizerSucceed,omitempty"`
+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="lastTimeFinalizerAttempt"
+ LastTimeFinalizerAttempt *metav1.Time
`json:"lastTimeFinalizerAttempt,omitempty"`
+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="lastTimeStatusNotified"
+ LastTimeStatusNotified *metav1.Time
`json:"lastTimeStatusNotified,omitempty"`
}
// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
diff --git
a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
index 2468c5a57fd..22faee96338 100644
--- a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
+++ b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
@@ -1470,6 +1470,14 @@ func (in *SonataFlowStatus) DeepCopyInto(out
*SonataFlowStatus) {
*out = make([]SonataFlowTriggerRef, len(*in))
copy(*out, *in)
}
+ if in.LastTimeFinalizerAttempt != nil {
+ in, out := &in.LastTimeFinalizerAttempt,
&out.LastTimeFinalizerAttempt
+ *out = (*in).DeepCopy()
+ }
+ if in.LastTimeStatusNotified != nil {
+ in, out := &in.LastTimeStatusNotified,
&out.LastTimeStatusNotified
+ *out = (*in).DeepCopy()
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SonataFlowStatus.
diff --git a/packages/sonataflow-operator/cmd/main.go
b/packages/sonataflow-operator/cmd/main.go
index 0ee30c457ce..5a524863c3e 100644
--- a/packages/sonataflow-operator/cmd/main.go
+++ b/packages/sonataflow-operator/cmd/main.go
@@ -24,6 +24,8 @@ import (
"flag"
"os"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
@@ -92,6 +94,8 @@ func main() {
flag.StringVar(&controllerCfgPath, "controller-cfg-path", "", "The
controller config file path.")
flag.Parse()
+ manager.SetOperatorStartTime()
+
ctrl.SetLogger(klogr.New().WithName(controller.ComponentName))
// if the enable-http2 flag is false (the default), http/2 should be
disabled
@@ -152,6 +156,9 @@ func main() {
os.Exit(1)
}
+ // Initialize the worker used by the SonataFlow reconciliations to
execute auxiliary async operations.
+ manager.InitializeSFCWorker(manager.SonataFlowControllerWorkerSize)
+
if err = (&controller.SonataFlowReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
diff --git
a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
index 42a6fa178cd..228f10bd633 100644
---
a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
+++
b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
@@ -241,10 +241,18 @@ spec:
- description: Endpoint is an externally accessible URL of the
workflow
displayName: endpoint
path: endpoint
+ - displayName: finalizerAttempts
+ path: finalizerAttempts
+ - displayName: finalizerSucceed
+ path: finalizerSucceed
- displayName: flowRevision
path: flowCRC
+ - displayName: lastTimeFinalizerAttempt
+ path: lastTimeFinalizerAttempt
- displayName: lastTimeRecoverAttempt
path: lastTimeRecoverAttempt
+ - displayName: lastTimeStatusNotified
+ path: lastTimeStatusNotified
- description: Platform displays which platform is being used by
this workflow
displayName: platform
path: platform
diff --git a/packages/sonataflow-operator/go.mod
b/packages/sonataflow-operator/go.mod
index 3840aaa295a..75594886016 100644
--- a/packages/sonataflow-operator/go.mod
+++ b/packages/sonataflow-operator/go.mod
@@ -14,6 +14,7 @@ require (
github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api
v0.0.0
github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder
v0.0.0
github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj
v0.0.0
+ github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/go-logr/logr v1.4.2 // indirect
github.com/imdario/mergo v0.3.16
@@ -47,7 +48,6 @@ require (
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudevents/sdk-go/sql/v2 v2.13.0 // indirect
- github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 //
indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
diff --git
a/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
b/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
index 95f3252a510..49987caa106 100644
---
a/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
+++
b/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
@@ -29,6 +29,10 @@ artifacts:
path: "../../../../internal/controller"
dest: /workspace/internal/controller
+ - name: manager
+ path: "../../../../internal/manager"
+ dest: /workspace/internal/manager
+
- name: version
path: "../../../../version"
dest: /workspace/version
diff --git
a/packages/sonataflow-operator/internal/controller/builder/builder.go
b/packages/sonataflow-operator/internal/controller/builder/builder.go
index 25bad566d4b..a368152c93f 100644
--- a/packages/sonataflow-operator/internal/controller/builder/builder.go
+++ b/packages/sonataflow-operator/internal/controller/builder/builder.go
@@ -47,7 +47,7 @@ type BuildManager interface {
}
func NewBuildManager(ctx context.Context, client client.Client, cliConfig
*rest.Config, targetName, targetNamespace string) (BuildManager, error) {
- p, err := platform.GetActivePlatform(ctx, client, targetNamespace)
+ p, err := platform.GetActivePlatform(ctx, client, targetNamespace, true)
if err != nil {
if errors.IsNotFound(err) {
return nil, err
diff --git
a/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
b/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
index be67b5a33fc..3387b5a4228 100644
---
a/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
+++
b/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
@@ -59,7 +59,7 @@ func (k *sonataFlowBuildManager) GetOrCreateBuild(workflow
*operatorapi.SonataFl
if err := k.client.Get(k.ctx, client.ObjectKeyFromObject(workflow),
buildInstance); err != nil {
if errors.IsNotFound(err) {
plat := &operatorapi.SonataFlowPlatform{}
- if plat, err = platform.GetActivePlatform(k.ctx,
k.client, workflow.Namespace); err != nil {
+ if plat, err = platform.GetActivePlatform(k.ctx,
k.client, workflow.Namespace, true); err != nil {
return nil, err
}
workflowBuildTemplate :=
plat.Spec.Build.Template.DeepCopy()
diff --git
a/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
new file mode 100644
index 00000000000..73a1651b541
--- /dev/null
+++
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package eventing
+
+import (
+ "context"
+ "fmt"
+
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// GetWorkflowDefinitionEventsTargetURL returns the target url that must be
used to send the workflow definition events.
+func GetWorkflowDefinitionEventsTargetURL(cli client.Client, workflow
*operatorapi.SonataFlow) (string, error) {
+ var err error
+ var sfp *operatorapi.SonataFlowPlatform
+ var sink *duckv1.Destination
+ var uri string
+
+ if sfp, err = platform.GetActivePlatform(context.Background(), cli,
workflow.Namespace, false); err != nil {
+ return "", fmt.Errorf("failed to get active platform for
workflow: %s, namespace: %s : %v", workflow.Name, workflow.Namespace, err)
+ }
+ if sfp == nil {
+ klog.V(log.D).Infof("No active platform was found to calculate
the workflow definition events target url for workflow: %s, namespace: %s.",
workflow.Name, workflow.Namespace)
+ return "", err
+ }
+ diHandler := services.NewDataIndexHandler(sfp)
+ if !diHandler.IsServiceEnabled() {
+ klog.V(log.D).Infof("DataIndex is not enabled for current
workflow: %s, namespace: %s, neither in current platform: %s, or by a cluster
platform reference.", workflow.Name, workflow.Namespace, sfp.Name)
+ return "", nil
+ }
+
+ // First check if the workflow is connected with the knative eventing
system.
+ if sink, err = knative.GetWorkflowSink(workflow, sfp); err != nil {
+ return "", fmt.Errorf("failed to look for a potential sink
configuration for workflow: %s, namespace: %s : %v", workflow.Name,
workflow.Namespace, err)
+ }
+ if sink != nil {
+ // Workflow is connected via with knative eventing by using an
operator managed SinkBinding.
+ if sinkURI, err :=
knative.GetSinkBindingSinkURI(workflow.Name+"-sb", workflow.Namespace); err !=
nil {
+ return "", err
+ } else {
+ uri = sinkURI.String()
+ }
+ } else {
+ // Workflow is connected via direct http invocation with the DI.
+ uri = diHandler.GetServiceBaseUrl() +
constants.KogitoProcessDefinitionsEventsPath
+ }
+ return uri, nil
+}
diff --git
a/packages/sonataflow-operator/internal/controller/knative/knative.go
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index 49f2540b2d4..3886363b696 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -116,8 +116,8 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
}
}
-// getRemotePlatform returns the remote platfrom referred by a
SonataFlowClusterPlatform
-func getRemotePlatform(pl *operatorapi.SonataFlowPlatform)
(*operatorapi.SonataFlowPlatform, error) {
+// GetRemotePlatform returns the remote platform referred by a
SonataFlowClusterPlatform if any.
+func GetRemotePlatform(pl *operatorapi.SonataFlowPlatform)
(*operatorapi.SonataFlowPlatform, error) {
if pl.Status.ClusterPlatformRef != nil {
// Find the platform referred by the cluster platform
platform := &operatorapi.SonataFlowPlatform{}
@@ -172,12 +172,15 @@ func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlo
if workflow.Spec.Sink != nil {
return getDestinationWithNamespace(workflow.Spec.Sink,
workflow.Namespace), nil
}
- if pl != nil && pl.Spec.Eventing != nil && pl.Spec.Eventing.Broker !=
nil {
+ if pl == nil {
+ return nil, nil
+ }
+ if pl.Spec.Eventing != nil && pl.Spec.Eventing.Broker != nil {
// no sink defined in the workflow, use the platform broker
return getDestinationWithNamespace(pl.Spec.Eventing.Broker,
pl.Namespace), nil
}
// Find the remote platform referred by the cluster platform
- platform, err := getRemotePlatform(pl)
+ platform, err := GetRemotePlatform(pl)
if err != nil {
return nil, err
}
@@ -290,3 +293,16 @@ func CheckKSinkInjected(name, namespace string) (bool,
error) {
}
return false, nil // K_SINK has not been injected yet
}
+
+// GetSinkBindingSinkURI returns the address of the sink referred by a
SinkBinding.
+func GetSinkBindingSinkURI(name, namespace string) (*apis.URL, error) {
+ sb := &sourcesv1.SinkBinding{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Name: name, Namespace: namespace}, sb); err != nil {
+ return nil, err
+ }
+ cond := sb.Status.GetCondition(apis.ConditionType(apis.ConditionReady))
+ if cond == nil || cond.Status != corev1.ConditionTrue {
+ return nil, fmt.Errorf("SinkBinding name: %s, namespace: %s is
not ready", name, namespace)
+ }
+ return sb.Status.SinkURI, nil
+}
diff --git
a/packages/sonataflow-operator/internal/controller/platform/platform.go
b/packages/sonataflow-operator/internal/controller/platform/platform.go
index c98e6ff9b5a..fe36aaea20e 100644
--- a/packages/sonataflow-operator/internal/controller/platform/platform.go
+++ b/packages/sonataflow-operator/internal/controller/platform/platform.go
@@ -119,23 +119,41 @@ func GetOperatorLockName(operatorID string) string {
}
// GetActivePlatform returns the currently installed active platform in the
local namespace.
-func GetActivePlatform(ctx context.Context, c ctrl.Client, namespace string)
(*operatorapi.SonataFlowPlatform, error) {
- return getLocalPlatform(ctx, c, namespace, true)
+// The parameter createIfNotExists determines if such platform must be created
when not exists. Never nil when
+// createsIfNotExists is true, unless an error.
+func GetActivePlatform(ctx context.Context, c ctrl.Client, namespace string,
createIfNotExists bool) (*operatorapi.SonataFlowPlatform, error) {
+ platform, err := getLocalPlatform(ctx, c, namespace, true)
+ if err != nil {
+ return nil, err
+ }
+ if platform != nil {
+ return platform, nil
+ }
+ klog.V(log.I).InfoS("No active SonataFlowPlatform was found in
namespace", "Namespace", namespace)
+ if createIfNotExists {
+ klog.V(log.I).InfoS("Creating a default SonataFlowPlatform",
"Namespace", namespace)
+ sfp := newEmptySonataFlowPlatform(namespace)
+ if err = CreateOrUpdateWithDefaults(ctx, sfp, false); err !=
nil {
+ return nil, err
+ }
+ return sfp, nil
+ }
+ return nil, nil
}
-// getLocalPlatform returns the currently installed platform or any platform
existing in local namespace.
+// getLocalPlatform returns the currently installed active platform, or any
platform, existing in local namespace when no
+// active platform exists. When the active parameter is true, only active
platforms are considered.
+// In other cases, a non-active platform might be returned as a second option.
func getLocalPlatform(ctx context.Context, c ctrl.Client, namespace string,
active bool) (*operatorapi.SonataFlowPlatform, error) {
- klog.V(log.D).InfoS("Finding available platforms")
-
+ klog.V(log.D).InfoS("Finding available platforms in namespace",
"namespace", namespace)
lst, err := listPrimaryPlatforms(ctx, c, namespace)
if err != nil {
return nil, err
}
-
for _, p := range lst.Items {
platform := p // pin
if IsActive(&platform) {
- klog.V(log.D).InfoS("Found active local build
platform", "platform", platform.Name)
+ klog.V(log.D).InfoS("Found active local platform",
"platform", platform.Name)
return &platform, nil
}
}
@@ -143,16 +161,10 @@ func getLocalPlatform(ctx context.Context, c ctrl.Client,
namespace string, acti
if !active && len(lst.Items) > 0 {
// does not require the platform to be active, just return one
if present
res := lst.Items[0]
- klog.V(log.D).InfoS("Found local build platform", "platform",
res.Name)
+ klog.V(log.D).InfoS("Found non-active local platform",
"platform", res.Name)
return &res, nil
}
- klog.V(log.I).InfoS("Not found a local build platform", "Namespace",
namespace)
- klog.V(log.I).InfoS("Creating a default SonataFlowPlatform",
"Namespace", namespace)
- sfp := newEmptySonataFlowPlatform(namespace)
- if err = CreateOrUpdateWithDefaults(ctx, sfp, false); err != nil {
- return nil, err
- }
- return sfp, nil
+ return nil, nil
}
func newEmptySonataFlowPlatform(namespace string)
*operatorapi.SonataFlowPlatform {
diff --git
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index 6ef5ac4204a..6ddb220a4c1 100644
---
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -112,6 +112,9 @@ type PlatformServiceHandler interface {
// SetServiceUrlInWorkflowStatus sets the service url in a workflow's
status.
SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)
+ // GetServiceSource returns the source Broker configured for the given
service by applying the following precedence rule.
+ // The source declared in the given service definition is returned
first, if any, otherwise a source declared in the
+ // service platform is returned, if any.
GetServiceSource() *duckv1.Destination
// Check if K_SINK has injected for Job Service. No Op for Data Index
@@ -690,7 +693,7 @@ func (d *DataIndexHandler)
GenerateKnativeResources(platform *operatorapi.Sonata
d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-node", "ProcessInstanceNodeDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-state", "ProcessInstanceStateDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-variable", "ProcessInstanceVariableDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-definition", "ProcessDefinitionEvent",
constants.KogitoProcessDefinitionsEventsPath, platform),
+ d.newTrigger(lbl, managedAnnotations, brokerName, namespace,
serviceName, "process-definition", "ProcessDefinitionEvent",
constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent",
constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace,
serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
index a2a4a7ac1df..fb86e55b601 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
@@ -30,6 +30,13 @@ const (
// RequeueRecoverDeploymentErrorInterval interval between recovering
from failures
RequeueRecoverDeploymentErrorInterval = RecoverDeploymentErrorInterval
* time.Minute
RecoverDeploymentErrorInterval = 10
-
- DefaultHTTPWorkflowPortInt = 8080
+ DefaultHTTPWorkflowPortInt = 8080
+ // MaxWorkflowFinalizerAttempts how many times the operator will try to
execute a SonataFlow CRD finalizer.
+ MaxWorkflowFinalizerAttempts = 3
+ // WorkflowFinalizerRetryInterval interval between SonataFlow CRD
finalizer execution attempts.
+ WorkflowFinalizerRetryInterval = 5 * time.Second
+ // WorkflowFinalizerSchedulingRetryInterval interval for the operator
to retry to schedule a failing finalizer scheduling.
+ WorkflowFinalizerSchedulingRetryInterval = 5 * time.Second
+ // EventDeliveryTimeout delivery timeout for the cloud events produced
by the operator.
+ EventDeliveryTimeout = 30 * time.Second
)
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
index f0fca0a4b63..11be4c5270b 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
@@ -18,18 +18,20 @@
package constants
const (
- MicroprofileServiceCatalogPropertyPrefix =
"org.kie.kogito.addons.discovery."
- KogitoOutgoingEventsURL =
"mp.messaging.outgoing.kogito_outgoing_stream.url"
- KogitoOutgoingEventsConnector =
"mp.messaging.outgoing.kogito_outgoing_stream.connector"
- KogitoIncomingEventsConnector =
"mp.messaging.incoming.kogito_incoming_stream.connector"
- KogitoIncomingEventsPath =
"mp.messaging.incoming.kogito_incoming_stream.path"
- KnativeHealthEnabled =
"org.kie.kogito.addons.knative.eventing.health-enabled"
- KnativeInjectedEnvVar = "${K_SINK}"
- TriggerFinalizer = "trigger-deletion"
- QuarkusDevUICorsEnabled = "quarkus.dev-ui.cors.enabled"
- QuarkusHttpCors = "quarkus.http.cors"
- QuarkusHttpCorsOrigins = "quarkus.http.cors.origins"
- KogitoEventsGrouping = "kogito.events.grouping"
- KogitoEventsGroupingBinary =
"kogito.events.grouping.binary"
- KogitoEventsGroupingCompress =
"kogito.events.grouping.compress"
+ MicroprofileServiceCatalogPropertyPrefix =
"org.kie.kogito.addons.discovery."
+ KogitoOutgoingEventsURL =
"mp.messaging.outgoing.kogito_outgoing_stream.url"
+ KogitoOutgoingEventsConnector =
"mp.messaging.outgoing.kogito_outgoing_stream.connector"
+ KogitoIncomingEventsConnector =
"mp.messaging.incoming.kogito_incoming_stream.connector"
+ KogitoIncomingEventsPath =
"mp.messaging.incoming.kogito_incoming_stream.path"
+ KnativeHealthEnabled =
"org.kie.kogito.addons.knative.eventing.health-enabled"
+ KnativeInjectedEnvVar = "${K_SINK}"
+ TriggerFinalizer = "trigger-deletion"
+ WorkflowFinalizer = "workflow-deletion"
+ QuarkusDevUICorsEnabled =
"quarkus.dev-ui.cors.enabled"
+ QuarkusHttpCors = "quarkus.http.cors"
+ QuarkusHttpCorsOrigins =
"quarkus.http.cors.origins"
+ KogitoEventsGrouping = "kogito.events.grouping"
+ KogitoEventsGroupingBinary =
"kogito.events.grouping.binary"
+ KogitoEventsGroupingCompress =
"kogito.events.grouping.compress"
+ SendWorkflowDefinitionsStatusUpdateEventError = "An error was produced
while sending a workflow definition status change event."
)
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
b/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
index 1587f5440b5..545c2c47019 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
@@ -116,13 +116,27 @@ func (a *managedPropertyHandler) Build() string {
// withKogitoServiceUrl adds the property kogitoServiceUrlProperty to the
application properties.
// See Service Discovery
https://kubernetes.io/docs/concepts/services-networking/service/#dns
func (a *managedPropertyHandler) withKogitoServiceUrl() ManagedPropertyHandler
{
- var kogitoServiceUrl string
- if len(a.workflow.Namespace) > 0 {
- kogitoServiceUrl = fmt.Sprintf("%s://%s.%s",
constants.DefaultHTTPProtocol, a.workflow.Name, a.workflow.Namespace)
+ return a.addDefaultManagedProperty(constants.KogitoServiceURLProperty,
GetKogitoServiceUrl(a.workflow))
+}
+
+func GetKogitoServiceUrl(workflow *operatorapi.SonataFlow) string {
+ return GetKogitoServiceUrlWithNameAndNamespace(workflow.Name,
workflow.Namespace)
+}
+
+func GetKogitoServiceUrlWithNameAndNamespace(name, namespace string) string {
+ if len(namespace) > 0 {
+ return fmt.Sprintf("%s://%s.%s", constants.DefaultHTTPProtocol,
name, namespace)
} else {
- kogitoServiceUrl = fmt.Sprintf("%s://%s",
constants.DefaultHTTPProtocol, a.workflow.Name)
+ return fmt.Sprintf("%s://%s", constants.DefaultHTTPProtocol,
name)
}
- return a.addDefaultManagedProperty(constants.KogitoServiceURLProperty,
kogitoServiceUrl)
+}
+
+func GetWorkflowEndpointUrl(workflow *operatorapi.SonataFlow) string {
+ return GetWorkflowEndpointUrlWithNameAndNamespace(workflow.Name,
workflow.Namespace)
+}
+
+func GetWorkflowEndpointUrlWithNameAndNamespace(name, namespace string) string
{
+ return GetKogitoServiceUrlWithNameAndNamespace(name, namespace) + "/" +
name
}
// withKafkaHealthCheckDisabled adds the property kafkaSmallRyeHealthProperty
to the application properties.
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
b/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
index e3967d8cfc4..93b582a28fd 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
@@ -53,7 +53,7 @@ type StateSupport struct {
// PerformStatusUpdate updates the SonataFlow Status conditions
func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow
*operatorapi.SonataFlow) (bool, error) {
var err error
- pl, err := platform.GetActivePlatform(ctx, s.C, workflow.Namespace)
+ pl, err := platform.GetActivePlatform(ctx, s.C, workflow.Namespace,
true)
if err != nil {
return false, err
}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
b/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
index 4ae5443ee69..457dc34cb2e 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
@@ -71,7 +71,7 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context,
workflow *operatora
devBaseContainerImage := workflowdef.GetDefaultWorkflowDevModeImageTag()
// check if the Platform available
- pl, err := platform.GetActivePlatform(context.TODO(), e.C,
workflow.Namespace)
+ pl, err := platform.GetActivePlatform(context.TODO(), e.C,
workflow.Namespace, true)
if err != nil {
return ctrl.Result{Requeue: false}, objs, err
}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
index 61a39162f74..f1285c8baae 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
@@ -19,6 +19,26 @@ package preview
import (
"context"
+ "fmt"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/eventing"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+
+ "k8s.io/client-go/util/retry"
+
+ "k8s.io/apimachinery/pkg/types"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/properties"
+
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -32,7 +52,6 @@ import (
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/monitoring"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
-
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils"
)
@@ -58,6 +77,7 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx
context.Context, workflow
return reconcile.Result{Requeue: false}, nil, err
}
+ previousStatus := workflow.Status
// Ensure objects
result, objs, err := d.ensureObjects(ctx, workflow, image)
if err != nil || result.Requeue {
@@ -70,9 +90,11 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx
context.Context, workflow
return reconcile.Result{Requeue: false}, nil, err
}
+ d.updateLastTimeStatusNotified(workflow, previousStatus)
if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil {
return reconcile.Result{Requeue: false}, nil, err
}
+ d.scheduleWorkflowStatusChangeNotification(ctx, workflow)
return result, objs, nil
}
@@ -95,7 +117,7 @@ func (d *DeploymentReconciler)
ensureKnativeServingRequired(workflow *operatorap
}
func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow
*operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object,
error) {
- pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace)
+ pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace, true)
userPropsCM, _, err := d.ensurers.userPropsConfigMap.Ensure(ctx,
workflow)
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.ExternalResourcesNotFoundReason, "Unable to retrieve the user properties
config map")
@@ -189,3 +211,62 @@ func (d *DeploymentReconciler)
deploymentModelMutateVisitors(
common.RestoreDeploymentVolumeAndVolumeMountMutateVisitor(),
common.RolloutDeploymentIfCMChangedMutateVisitor(workflow,
userPropsCM, managedPropsCM)}
}
+
+func (d *DeploymentReconciler) updateLastTimeStatusNotified(workflow
*operatorapi.SonataFlow, previousStatus operatorapi.SonataFlowStatus) {
+ previousRunningCondition :=
previousStatus.GetCondition(api.RunningConditionType)
+ currentRunningCondition :=
workflow.Status.GetCondition(api.RunningConditionType)
+
+ if previousRunningCondition == nil {
+ previousRunningCondition = currentRunningCondition
+ }
+ if previousRunningCondition.Status != currentRunningCondition.Status ||
workflow.Status.LastTimeStatusNotified != nil &&
workflow.Status.LastTimeStatusNotified.Time.Before(manager.GetOperatorStartTime())
{
+ workflow.Status.LastTimeStatusNotified = nil
+ }
+}
+
+func (d *DeploymentReconciler) scheduleWorkflowStatusChangeNotification(ctx
context.Context, workflow *operatorapi.SonataFlow) {
+ if workflow.Status.LastTimeStatusNotified == nil {
+ manager.GetSFCWorker().RunAsync(func() {
+ if err := notifyWorkflowStatusChange(d.C,
workflow.Name, workflow.Namespace); err != nil {
+ klog.V(log.E).ErrorS(err, "Failed to notify
workflow status change, controller will schedule a new retry.", "workflow",
"namespace", workflow.Name, workflow.Namespace, err)
+ }
+ })
+ }
+}
+
+func notifyWorkflowStatusChange(cli client.Client, wfName, wfNamespace string)
error {
+ var err error
+ var uri string
+ retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
+ workflow := &operatorapi.SonataFlow{}
+ if err = cli.Get(context.Background(),
types.NamespacedName{Name: wfName, Namespace: wfNamespace}, workflow); err !=
nil {
+ return err
+ }
+ workflow = workflow.DeepCopy()
+ available :=
workflow.Status.GetCondition(api.RunningConditionType).IsTrue()
+ if uri, err =
eventing.GetWorkflowDefinitionEventsTargetURL(cli, workflow); err != nil {
+ return fmt.Errorf("failed to get workflow definition
events target url to send the workflow definition status update event: %v", err)
+ }
+ if len(uri) == 0 {
+ klog.V(log.D).Infof("No enabled DataIndex, nor Broker,
nor Sink configuration was found to send the workflow definition status update
event for workflow: %s, namespace: %s", workflow.Name, workflow.Namespace)
+ return nil
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(),
constants.EventDeliveryTimeout)
+ defer cancel()
+ evt :=
workflowdef.NewWorkflowDefinitionAvailabilityEvent(workflow,
workflowdef.SonataFlowOperatorSource,
properties.GetWorkflowEndpointUrl(workflow), available)
+ if err = utils.SendCloudEventWithContext(evt, ctx, uri); err !=
nil {
+ return fmt.Errorf("failed to send workflow definition
status update event: %v", err)
+ // Controller handle to program a new notification
based on the LastTimeStatusNotified.
+ } else {
+ now := metav1.Now()
+ // Register the LastTimeStatusNotified, the controller
knows how to react based on that value.
+ workflow.Status.LastTimeStatusNotified = &now
+ if err = cli.Status().Update(context.Background(),
workflow); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ return retryErr
+}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
index dbc5e405b2d..2cdac64d0ca 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
@@ -21,6 +21,8 @@ import (
"context"
"testing"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
"github.com/magiconair/properties"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/apps/v1"
@@ -69,6 +71,8 @@ func Test_CheckDeploymentModelIsKnative(t *testing.T) {
}
func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) {
+ manager.InitializeSFCWorker(manager.SonataFlowControllerWorkerSize)
+ manager.SetOperatorStartTime()
workflow := test.GetBaseSonataFlowWithPreviewProfile(t.Name())
client := test.NewSonataFlowClientBuilder().
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
index 492e5aab733..55a50e5778b 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
@@ -63,7 +63,7 @@ func (h *newBuilderState) CanReconcile(workflow
*operatorapi.SonataFlow) bool {
}
func (h *newBuilderState) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
- pl, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace)
+ pl, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace,
true)
if err != nil {
if errors.IsNotFound(err) {
workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.WaitingForPlatformReason,
@@ -198,7 +198,7 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
// Guard to avoid errors while getting a new builder manager.
// Maybe we can do typed errors in the buildManager and
// have something like sonataerr.IsPlatformNotFound(err) instead.
- _, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace)
+ _, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace, true)
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.WaitingForPlatformReason,
"No active Platform for namespace %s so the
resWorkflowDef cannot be deployed. Waiting for an active platform",
workflow.Namespace)
diff --git
a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
index 8167f42dfa1..55d29dae56d 100644
--- a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
+++ b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
@@ -23,17 +23,32 @@ import (
"context"
"fmt"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/eventing"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils"
+
+ "k8s.io/client-go/util/retry"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/properties"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+ "k8s.io/klog/v2"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/monitoring"
- "k8s.io/klog/v2"
-
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/metadata"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
- profiles
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/factory"
+ profilesfactory
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/factory"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -104,14 +119,19 @@ func (r *SonataFlowReconciler) Reconcile(ctx
context.Context, req ctrl.Request)
}
r.setDefaults(workflow)
- // If the workflow is being deleted, clean up the triggers on a
different namespace
- if workflow.DeletionTimestamp != nil &&
controllerutil.ContainsFinalizer(workflow, constants.TriggerFinalizer) {
- err := r.cleanupTriggers(ctx, workflow)
- if err != nil {
- klog.V(log.E).ErrorS(err, "Failed to clean up triggers
for workflow %s", workflow.Name)
- return ctrl.Result{}, err
+ // If the workflow is being deleted, execute the associated finalizers
+ if workflow.DeletionTimestamp != nil {
+ return r.applyFinalizers(ctx, workflow)
+ }
+
+ // If first recon cycle, add the WorkflowFinalizer
+ if !profiles.IsDevProfile(workflow) {
+ if controllerutil.AddFinalizer(workflow,
constants.WorkflowFinalizer) {
+ if err := r.Client.Update(ctx, workflow); err != nil {
+ klog.V(log.E).ErrorS(err, "Failed to add
workflow finalizer.", "workflow", "namespace", "finalizer", workflow.Name,
workflow.Namespace, constants.WorkflowFinalizer)
+ return ctrl.Result{}, err
+ }
}
- return ctrl.Result{}, nil
}
// Only process resources assigned to the operator
@@ -119,7 +139,7 @@ func (r *SonataFlowReconciler) Reconcile(ctx
context.Context, req ctrl.Request)
klog.V(log.I).InfoS("Ignoring request because resource is not
assigned to current operator")
return reconcile.Result{}, nil
}
- return profiles.NewReconciler(r.Client, r.Config, r.Recorder,
workflow).Reconcile(ctx, workflow)
+ return profilesfactory.NewReconciler(r.Client, r.Config, r.Recorder,
workflow).Reconcile(ctx, workflow)
}
// TODO: move to webhook see
https://github.com/apache/incubator-kie-tools/packages/sonataflow-operator/pull/239
@@ -134,6 +154,91 @@ func (r *SonataFlowReconciler) setDefaults(workflow
*operatorapi.SonataFlow) {
}
}
+// applyFinalizers Manages the execution of the workflow finalizers.
+func (r *SonataFlowReconciler) applyFinalizers(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error) {
+ if controllerutil.ContainsFinalizer(workflow,
constants.TriggerFinalizer) {
+ if err := r.cleanupTriggers(ctx, workflow); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ if controllerutil.ContainsFinalizer(workflow,
constants.WorkflowFinalizer) {
+ var wasScheduled = false
+ var err error
+ if !workflow.Status.FinalizerSucceed &&
workflow.Status.FinalizerAttempts < constants.MaxWorkflowFinalizerAttempts {
+ now := metav1.Now()
+ workflow.Status.FinalizerAttempts =
workflow.Status.FinalizerAttempts + 1
+ workflow.Status.LastTimeFinalizerAttempt = &now
+ if err = r.Client.Status().Update(ctx, workflow); err
!= nil {
+ return ctrl.Result{}, err
+ }
+ if wasScheduled, err =
scheduleWorkflowDeletionNotification(r.Client, workflow); err != nil {
+ remaining :=
constants.MaxWorkflowFinalizerAttempts - workflow.Status.FinalizerAttempts
+ if remaining > 0 {
+ klog.V(log.E).ErrorS(err,
fmt.Sprintf("Failed to schedule workflow deletion notification, %d remaining
attempts are left", remaining))
+ } else {
+ klog.V(log.E).ErrorS(err, "Failed to
schedule workflow deletion notification, no attempts are left.", "workflow",
"namespace", workflow.Name, workflow.Namespace)
+ }
+ return ctrl.Result{RequeueAfter:
constants.WorkflowFinalizerSchedulingRetryInterval}, nil
+ }
+ }
+ if wasScheduled {
+ return ctrl.Result{RequeueAfter:
constants.WorkflowFinalizerRetryInterval}, nil
+ } else {
+ controllerutil.RemoveFinalizer(workflow,
constants.WorkflowFinalizer)
+ if err := r.Client.Update(ctx, workflow); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ }
+ return ctrl.Result{}, nil
+}
+
+func scheduleWorkflowDeletionNotification(cli client.Client, workflow
*operatorapi.SonataFlow) (bool, error) {
+ if eventTargetUrl, err :=
eventing.GetWorkflowDefinitionEventsTargetURL(cli, workflow); err != nil {
+ return false, fmt.Errorf("failed to get workflow definition
events target url to send the workflow definition status update event: %v", err)
+ } else {
+ if len(eventTargetUrl) > 0 {
+ manager.GetSFCWorker().RunAsync(func() {
+ wf := *workflow.DeepCopy()
+ if err := notifyWorkflowDeletion(cli, &wf,
eventTargetUrl); err != nil {
+ klog.V(log.E).ErrorS(err, "Failed to
notify workflow deletion, controller will schedule a new retry if remaining
attempts > 0.", "workflow", "namespace", workflow.Name, workflow.Namespace)
+ }
+ })
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func notifyWorkflowDeletion(cli client.Client, workflow
*operatorapi.SonataFlow, eventTargetUrl string) error {
+ retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
+ var err error
+ evt :=
workflowdef.NewWorkflowDefinitionAvailabilityEvent(workflow,
workflowdef.SonataFlowOperatorSource,
properties.GetWorkflowEndpointUrlWithNameAndNamespace(workflow.Name,
workflow.Namespace), false)
+ ctx, cancel := context.WithTimeout(context.Background(),
constants.EventDeliveryTimeout)
+ defer cancel()
+
+ if err = utils.SendCloudEventWithContext(evt, ctx,
eventTargetUrl); err != nil {
+ // controller handles to program a new notification
based on the remainder FinalizerAttempts if needed.
+ return fmt.Errorf("failed to send workflow definition
status update event: %v", err)
+ }
+
+ wfName := workflow.Name
+ wfNamespace := workflow.Namespace
+ workflow = &operatorapi.SonataFlow{}
+ if err = cli.Get(context.Background(),
types.NamespacedName{Name: wfName, Namespace: wfNamespace}, workflow); err !=
nil {
+ return err
+ }
+
+ workflow = workflow.DeepCopy()
+ workflow.Status.FinalizerSucceed = true
+ if err = cli.Status().Update(context.Background(), workflow);
err != nil {
+ return err
+ }
+ return nil
+ })
+ return retryErr
+}
+
func (r *SonataFlowReconciler) cleanupTriggers(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
for _, triggerRef := range workflow.Status.Triggers {
trigger := &eventingv1.Trigger{
@@ -225,6 +330,15 @@ func buildEnqueueRequestsFromMapFunc(c client.Client, b
*operatorapi.SonataFlowB
func (r *SonataFlowReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&operatorapi.SonataFlow{}).
+ WithEventFilter(predicate.Funcs{
+ UpdateFunc: func(e event.UpdateEvent) bool {
+ oldGeneration := e.ObjectOld.GetGeneration()
+ newGeneration := e.ObjectNew.GetGeneration()
+ // Generation is only updated on spec changes
(also on deletion), not upon metadata or status changes.
+ // Filter out events where the generation
hasn't changed to avoid being triggered by status updates.
+ return oldGeneration != newGeneration
+ },
+ }).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
diff --git
a/packages/sonataflow-operator/internal/controller/workflowdef/events.go
b/packages/sonataflow-operator/internal/controller/workflowdef/events.go
new file mode 100644
index 00000000000..e9f6d831856
--- /dev/null
+++ b/packages/sonataflow-operator/internal/controller/workflowdef/events.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package workflowdef
+
+import (
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/metadata"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+)
+
+const SonataFlowOperatorSource = "sonataflow.org/operator"
+
+func NewWorkflowDefinitionAvailabilityEvent(workflow *operatorapi.SonataFlow,
eventSource string, serviceUrl string, available bool) *cloudevents.Event {
+ var status = "unavailable"
+ if available {
+ status = "available"
+ }
+ event := cloudevents.NewEvent(cloudevents.VersionV1)
+ event.SetType("ProcessDefinitionEvent")
+ event.SetSource(eventSource)
+ event.SetExtension("kogitoprocid", workflow.Name)
+ event.SetExtension("partitionkey", workflow.Name)
+ data := make(map[string]interface{})
+ data["id"] = workflow.Name
+ data["name"] = workflow.Name
+ version := workflow.ObjectMeta.Annotations[metadata.Version]
+ data["version"] = version
+ data["type"] = "SW"
+ data["endpoint"] = serviceUrl
+ data["metadata"] = map[string]interface{}{
+ "status": status,
+ }
+ data["nodes"] = [0]string{}
+ _ = event.SetData(cloudevents.ApplicationJSON, data)
+ return &event
+}
diff --git a/packages/sonataflow-operator/internal/manager/worker.go
b/packages/sonataflow-operator/internal/manager/worker.go
new file mode 100644
index 00000000000..3d4514a0fe1
--- /dev/null
+++ b/packages/sonataflow-operator/internal/manager/worker.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package manager
+
+import (
+ "time"
+)
+
+const (
+ SonataFlowControllerWorkerSize = 100
+)
+
+var (
+ sonataFlowControllerWorker *Worker
+ operatorStarTime time.Time
+)
+
+type Runnable func()
+
+type Worker struct {
+ ch chan Runnable
+}
+
+func SetOperatorStartTime() {
+ operatorStarTime = time.Now()
+}
+
+func GetOperatorStartTime() time.Time {
+ return operatorStarTime
+}
+
+func GetSFCWorker() *Worker {
+ return sonataFlowControllerWorker
+}
+
+func InitializeSFCWorker(size int) *Worker {
+ worker := NewWorker(size)
+ worker.Start()
+ sonataFlowControllerWorker = &worker
+ return sonataFlowControllerWorker
+}
+
+func NewWorker(size int) Worker {
+ return Worker{ch: make(chan Runnable, size)}
+}
+
+func (w Worker) Start() {
+ go func(ch chan Runnable) {
+ for {
+ r, ok := <-ch
+ if !ok {
+ break
+ } else {
+ r()
+ }
+ }
+ }(w.ch)
+}
+
+func (w Worker) RunAsync(r Runnable) {
+ w.ch <- r
+}
diff --git a/packages/sonataflow-operator/operator.yaml
b/packages/sonataflow-operator/operator.yaml
index e54b4aad101..8bd04af5ada 100644
--- a/packages/sonataflow-operator/operator.yaml
+++ b/packages/sonataflow-operator/operator.yaml
@@ -27509,12 +27509,22 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the
workflow
type: string
+ finalizerAttempts:
+ type: integer
+ finalizerSucceed:
+ type: boolean
flowCRC:
format: int32
type: integer
+ lastTimeFinalizerAttempt:
+ format: date-time
+ type: string
lastTimeRecoverAttempt:
format: date-time
type: string
+ lastTimeStatusNotified:
+ format: date-time
+ type: string
observedGeneration:
description: The generation observed by the deployment
controller.
format: int64
diff --git a/packages/sonataflow-operator/utils/events.go
b/packages/sonataflow-operator/utils/events.go
new file mode 100644
index 00000000000..674264872af
--- /dev/null
+++ b/packages/sonataflow-operator/utils/events.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// SendCloudEvent Sends a cloud event to the given url using the http protocol
binding. By default, events are sent in
+// binary mode.
+func SendCloudEvent(event *cloudevents.Event, url string) error {
+ return SendCloudEventWithContext(event, context.TODO(), url)
+}
+
+// SendCloudEventWithContext Sends a cloud event to the given url using the
http protocol binding. By default, events
+// are sent in binary mode.
+func SendCloudEventWithContext(event *cloudevents.Event, ctx context.Context,
url string) error {
+ targetCtx := cloudevents.ContextWithTarget(ctx, url)
+ p, err := cloudevents.NewHTTP()
+ if err != nil {
+ return err
+ }
+ c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(),
cloudevents.WithUUIDs())
+ if err != nil {
+ return err
+ }
+ res := c.Send(targetCtx, *event)
+ if cloudevents.IsUndelivered(res) {
+ return fmt.Errorf("failed to send cloud event to url: %s, err:
%s", url, res.Error())
+ } else {
+ var httpResult *cehttp.Result
+ if cloudevents.ResultAs(res, &httpResult) {
+ if !resultOK(httpResult) {
+ return fmt.Errorf("failed to send cloud event
to url: %s, err: %s", url, httpResult.Error())
+ }
+ } else {
+ return fmt.Errorf("failed to send cloud event to url:
%s, Send did not return an HTTP response: %s", url, res)
+ }
+ }
+ return nil
+}
+
+func resultOK(httpResult *cehttp.Result) bool {
+ return httpResult.StatusCode == http.StatusOK || httpResult.StatusCode
== http.StatusAccepted
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]