ricardozanini commented on code in PR #350:
URL: 
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/350#discussion_r1452627874


##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -22,6 +22,9 @@ package common
 import (
        "context"
 
+       v12 "knative.dev/eventing/pkg/apis/eventing/v1"

Review Comment:
   ```suggestion
        eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
   ```



##########
controllers/profiles/common/object_creators.go:
##########
@@ -20,11 +20,22 @@
 package common
 
 import (
+       "fmt"
+       "strings"
+
+       "github.com/pkg/errors"
+
+       cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+
        "github.com/imdario/mergo"
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/util/intstr"
+       v12 "knative.dev/eventing/pkg/apis/eventing/v1"
+       v1 "knative.dev/eventing/pkg/apis/sources/v1"

Review Comment:
   Same here for alias.



##########
controllers/profiles/dev/states_dev.go:
##########
@@ -110,28 +118,78 @@ func (e *ensureRunningWorkflowState) Do(ctx 
context.Context, workflow *operatora
        }
        objs = append(objs, route)
 
+       if e.Catalog.(*discovery.SonataFlowServiceCatalog).KnativeCatalog == 
nil {
+               klog.V(log.E).ErrorS(err, "Unable to determine if knative is 
installed in the cluster")
+               
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType,
+                       api.KnativeFailureReason,
+                       "Knative Client is not installed")
+               updateStatus = true
+       } else if workflow.Spec.Flow.Events == nil {
+               // skip if no event is found
+               
workflow.Status.Manager().MarkTrueWithReason(api.KnativeResourcesConditionType,
+                       api.KnativeSkippedReason,
+                       "no need to create Knative eventing resources since no 
event definition is found")
+               updateStatus = true
+       } else if workflow.Spec.Sink == nil {
+               // mark false if spec.sink is not found
+               
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType,
+                       api.KnativeFailureReason,
+                       "Spec.Sink is not provided")
+               updateStatus = true

Review Comment:
   You can move this to the new state `followWorkflowKnativeState` instead. 
This state should only focus on ensure the resources. The ensurer can skip 
creating the object if the condition of `sink == nil` is met. I think you're 
already doing it there.



##########
controllers/profiles/common/ensurer.go:
##########
@@ -97,3 +86,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, 
workflow *operatorapi.So
        result := controllerutil.OperationResultNone
        return nil, result, nil
 }
+
+// ObjectsEnsurer is an ensurer to apply multiple objects
+type ObjectsEnsurer interface {
+       Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors 
...MutateVisitor) []ObjectEnsurerResult
+}
+
+type ObjectEnsurerResult struct {
+       client.Object
+       Result controllerutil.OperationResult
+       Error  error
+}
+
+func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) 
ObjectsEnsurer {
+       return &defaultObjectsEnsurer{
+               c:       client,
+               creator: creator,
+       }
+}
+
+type defaultObjectsEnsurer struct {
+       ObjectsEnsurer
+       c       client.Client
+       creator ObjectsCreator
+}
+
+func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow 
*operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
+       result := controllerutil.OperationResultNone
+
+       objects, err := d.creator(workflow)
+       if err != nil {
+               return []ObjectEnsurerResult{{nil, result, err}}
+       }
+       var ensureResult []ObjectEnsurerResult
+       for _, object := range objects {
+               ensureObject, c, err := EnsureObject(ctx, workflow, visitors, 
result, err, d.c, object)
+               ensureResult = append(ensureResult, 
ObjectEnsurerResult{ensureObject, c, err})
+               if err != nil {
+                       return ensureResult
+               }
+       }
+       return ensureResult
+}
+
+func EnsureObject(ctx context.Context, workflow *operatorapi.SonataFlow, 
visitors []MutateVisitor, result controllerutil.OperationResult, err error, c 
client.Client, object client.Object) (client.Object, 
controllerutil.OperationResult, error) {

Review Comment:
   Can you make this function unexported since we won't use it outside this 
module?



##########
controllers/profiles/prod/states_prod.go:
##########
@@ -205,3 +209,58 @@ func (h *deployWithBuildWorkflowState) 
isWorkflowChanged(workflow *operatorapi.S
        }
        return false
 }
+
+type followWorkflowKnativeState struct {

Review Comment:
   You can even use the same `knative.go` file I mentioned in another comment.



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -22,6 +22,9 @@ package common
 import (
        "context"
 
+       v12 "knative.dev/eventing/pkg/apis/eventing/v1"
+       v1 "knative.dev/eventing/pkg/apis/sources/v1"

Review Comment:
   Can you please alias these imports?
   
   ```suggestion
        sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
   ```



##########
controllers/profiles/common/object_creators.go:
##########
@@ -197,6 +211,93 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) 
(client.Object, error) {
        return service, nil
 }
 
+// SinkBindingCreator is an ObjectsCreator for SinkBinding.
+// It will create v1.SinkBinding based on events defined in workflow.
+func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, 
error) {
+       lbl := workflowproj.GetDefaultLabels(workflow)
+
+       // skip if no produced event is found
+       if !properties.ContainsEventKind(workflow, cncfmodel.EventKindProduced) 
{
+               return nil, nil
+       }
+
+       sink := workflow.Spec.Sink
+       if sink == nil {
+               return nil, errors.New("Sink is not provided to create 
SinkBinding")
+       }

Review Comment:
   You can skip this from returning an error. The `sink` attribute can be 
cleared once we have a webhook check.



##########
controllers/profiles/common/properties/services.go:
##########
@@ -0,0 +1,29 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package properties
+
+import (
+       operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+       cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+func ContainsEventKind(workflow *operatorapi.SonataFlow, eventKind 
cncfmodel.EventKind) bool {

Review Comment:
   Can you please move this function to `controllers/workflowdef` module?



##########
controllers/profiles/common/reconciler.go:
##########
@@ -40,12 +41,23 @@ type StateSupport struct {
        C        client.Client
        Catalog  discovery.ServiceCatalog
        Recorder record.EventRecorder
+       mu       sync.Mutex
 }
 
 // PerformStatusUpdate updates the SonataFlow Status conditions
-func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow 
*operatorapi.SonataFlow) (bool, error) {
+func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow 
*operatorapi.SonataFlow) (bool, error) {
        var err error
        workflow.Status.ObservedGeneration = workflow.Generation
+       // resolve conflict when updating obj multiple times in a single 
reconcile
+       // leader election will be used for multi instances of opr

Review Comment:
   Thank you!



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -103,6 +106,41 @@ func ServiceMutateVisitor(workflow 
*operatorapi.SonataFlow) MutateVisitor {
        }
 }
 
+func SinkBindingMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor {
+       return func(object client.Object) controllerutil.MutateFn {
+               return func() error {
+                       if kubeutil.IsObjectNew(object) {
+                               return nil
+                       }
+                       original, err := SinkBindingCreator(workflow)
+                       if err != nil {
+                               return err
+                       }
+                       return mergo.Merge(&object.(*v1.SinkBinding).Spec, 
original.(*v1.SinkBinding).Spec, mergo.WithOverride)
+               }
+       }
+}
+
+func TriggerMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor {
+       return func(object client.Object) controllerutil.MutateFn {
+               return func() error {
+                       if kubeutil.IsObjectNew(object) {
+                               return nil
+                       }
+                       originals, err := TriggersCreator(workflow)
+                       if err != nil {
+                               return err
+                       }
+                       for _, original := range originals {
+                               if original.GetName() == object.GetName() {
+                                       return 
mergo.Merge(&object.(*v12.Trigger).Spec, original.(*v12.Trigger).Spec, 
mergo.WithOverride)

Review Comment:
   We should ignore the `broker` here since users should be able to edit it 
freely until we implement the post work where we can configure triggers via the 
CRs.



##########
controllers/profiles/common/object_creators.go:
##########
@@ -197,6 +211,93 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) 
(client.Object, error) {
        return service, nil
 }
 
+// SinkBindingCreator is an ObjectsCreator for SinkBinding.
+// It will create v1.SinkBinding based on events defined in workflow.
+func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, 
error) {
+       lbl := workflowproj.GetDefaultLabels(workflow)
+
+       // skip if no produced event is found
+       if !properties.ContainsEventKind(workflow, cncfmodel.EventKindProduced) 
{
+               return nil, nil
+       }

Review Comment:
   You can add an `OR` clause here if `.spec.sink` is nil.



##########
controllers/profiles/common/object_creators.go:
##########
@@ -197,6 +211,93 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) 
(client.Object, error) {
        return service, nil
 }
 
+// SinkBindingCreator is an ObjectsCreator for SinkBinding.
+// It will create v1.SinkBinding based on events defined in workflow.
+func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, 
error) {
+       lbl := workflowproj.GetDefaultLabels(workflow)
+
+       // skip if no produced event is found
+       if !properties.ContainsEventKind(workflow, cncfmodel.EventKindProduced) 
{
+               return nil, nil
+       }
+
+       sink := workflow.Spec.Sink
+       if sink == nil {
+               return nil, errors.New("Sink is not provided to create 
SinkBinding")
+       }
+
+       // subject must be deployment to inject K_SINK, service won't work
+       sinkBinding := &v1.SinkBinding{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      strings.ToLower(fmt.Sprintf("sb-%s", 
workflow.Name)),
+                       Namespace: workflow.Namespace,
+                       Labels:    lbl,
+               },
+               Spec: v1.SinkBindingSpec{
+                       SourceSpec: duckv1.SourceSpec{
+                               Sink: *sink,
+                       },
+                       BindingSpec: duckv1.BindingSpec{
+                               Subject: tracker.Reference{
+                                       Name:       workflow.Name,
+                                       Namespace:  workflow.Namespace,
+                                       APIVersion: "apps/v1",
+                                       Kind:       "Deployment",
+                               },
+                       },
+               },
+       }
+       return sinkBinding, nil
+}
+
+// TriggersCreator is an ObjectsCreator for Triggers.
+// It will create a list of v12.Trigger based on events defined in workflow.
+func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, 
error) {
+       var resultObjects []client.Object
+       lbl := workflowproj.GetDefaultLabels(workflow)
+
+       //consumed
+       events := workflow.Spec.Flow.Events
+       for _, event := range events {
+               // filter out produce events
+               if event.Kind == cncfmodel.EventKindProduced {
+                       continue
+               }
+               sink := workflow.Spec.Sink
+               if sink == nil {
+                       return nil, errors.New("Sink is not provided to create 
SinkBinding!")
+               } else if sink.Ref.Kind != "Broker" || sink.Ref.APIVersion != 
"eventing.knative.dev/v1" {
+                       return nil, errors.New("Trigger source must be a 
broker!")
+               }

Review Comment:
   Hmm we are using `sink` to define the `broker` to listen to. I think it was 
a mistake as pointed out by Daniele. It's confusing prone. We can add the 
default trigger to listen to the default broker instead. Then a follow-up issue 
we can create a way for users to set this information in the `SonataFlow` or 
`SonataFlowPlatform` CRs.
   
   I don't remember discussing this in the meeting, if so it was my bad to not 
see this promptly.



##########
controllers/profiles/prod/deployment_handler.go:
##########
@@ -77,6 +79,50 @@ func (d *deploymentReconciler) reconcileWithBuiltImage(ctx 
context.Context, work
 
        objs := []client.Object{deployment, service, propsCM}
 
+       if d.Catalog.(*discovery.SonataFlowServiceCatalog).KnativeCatalog == 
nil {
+               
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType,
+                       api.KnativeFailureReason,
+                       "Knative Client is not installed")
+       } else if workflow.Spec.Flow.Events == nil {
+               // skip if no event is found
+               
workflow.Status.Manager().MarkTrueWithReason(api.KnativeResourcesConditionType,
+                       api.KnativeSkippedReason,
+                       "no need to create Knative eventing resources since no 
event definition is found")
+       } else if workflow.Spec.Sink == nil {
+               // mark false if spec.sink is not found
+               
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType,
+                       api.KnativeFailureReason,
+                       "Spec.Sink is not provided")
+       } else {
+               // create sinkBinding and trigger
+               sinkBinding, sinkBindingResult, err := 
d.ensurers.sinkBinding.Ensure(ctx, workflow, 
common.SinkBindingMutateVisitor(workflow))
+               if err != nil {
+                       return ctrl.Result{RequeueAfter: 
constants.RequeueAfterFailure}, objs, err
+               } else if sinkBinding != nil {
+                       objs = append(objs, sinkBinding)
+               }
+
+               triggers := d.ensurers.trigger.Ensure(ctx, workflow, 
common.TriggerMutateVisitor(workflow))
+               for _, trigger := range triggers {
+                       if trigger.Error != nil {
+                               return ctrl.Result{RequeueAfter: 
constants.RequeueAfterFailure}, objs, trigger.Error
+                       }
+                       objs = append(objs, trigger.Object)
+               }
+
+               if sinkBindingResult != controllerutil.OperationResultNone {
+                       // create, update should change the status
+                       
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType, 
api.WaitingForKnativeReason, "")
+               } else {
+                       for _, trigger := range triggers {
+                               if trigger.Result != 
controllerutil.OperationResultNone {
+                                       
workflow.Status.Manager().MarkFalse(api.KnativeResourcesConditionType, 
api.WaitingForKnativeReason, "")
+                                       break
+                               }

Review Comment:
   it seems that this logic is pretty close to what we have on both profiles. 
You can create a `knative.go` file within `controllers/profiles/common` and 
have it shared in both places. For instance 
`common.KnativeEventingHandler().Ensure()`.



##########
controllers/profiles/prod/states_prod.go:
##########
@@ -205,3 +209,58 @@ func (h *deployWithBuildWorkflowState) 
isWorkflowChanged(workflow *operatorapi.S
        }
        return false
 }
+
+type followWorkflowKnativeState struct {

Review Comment:
   You can move this state to the `controllers/profiles/common` module instead 
since it's doing essentially the same thing for both profiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to