This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 38122e26eb0af82edb8ab37805a531d1af797217
Author: Antonin Stefanutti <anto...@stefanutti.fr>
AuthorDate: Thu Oct 7 15:32:39 2021 +0200

    feat: Comprehensive integration error status
---
 config/rbac/operator-role.yaml                     |   6 +-
 pkg/apis/camel/v1/integration_types.go             |  22 +-
 pkg/apis/camel/v1/integration_types_support.go     |  30 +--
 pkg/cmd/operator/operator.go                       |  18 +-
 .../integration/integration_controller.go          |  10 +
 pkg/controller/integration/monitor.go              | 247 ++++++++++++++++-----
 pkg/resources/resources.go                         |   4 +-
 pkg/util/kubernetes/conditions.go                  | 125 +++--------
 8 files changed, 264 insertions(+), 198 deletions(-)

diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml
index fda70ff..e619bbf 100644
--- a/config/rbac/operator-role.yaml
+++ b/config/rbac/operator-role.yaml
@@ -83,8 +83,6 @@ rules:
   - apps
   resources:
   - deployments
-  - replicasets
-  - statefulsets
   verbs:
   - create
   - delete
@@ -108,9 +106,9 @@ rules:
   - update
   - watch
 - apiGroups:
-  - apps
+  - batch
   resources:
-  - daemonsets
+  - jobs
   verbs:
   - get
   - list
diff --git a/pkg/apis/camel/v1/integration_types.go 
b/pkg/apis/camel/v1/integration_types.go
index 3a6ea29..b30b66b 100644
--- a/pkg/apis/camel/v1/integration_types.go
+++ b/pkg/apis/camel/v1/integration_types.go
@@ -182,14 +182,24 @@ const (
        IntegrationConditionJolokiaAvailableReason string = "JolokiaAvailable"
        // IntegrationConditionProbesAvailableReason --
        IntegrationConditionProbesAvailableReason string = "ProbesAvailable"
-       // IntegrationConditionErrorReason --
-       IntegrationConditionErrorReason string = "Error"
+
+       // IntegrationConditionKnativeServiceReadyReason --
+       IntegrationConditionKnativeServiceReadyReason string = 
"KnativeServiceReady"
+       // IntegrationConditionDeploymentReadyReason --
+       IntegrationConditionDeploymentReadyReason string = "DeploymentReady"
+       // IntegrationConditionDeploymentProgressingReason --
+       IntegrationConditionDeploymentProgressingReason string = 
"DeploymentProgressing"
        // IntegrationConditionCronJobCreatedReason --
        IntegrationConditionCronJobCreatedReason string = "CronJobCreated"
-       // IntegrationConditionReplicaSetReadyReason --
-       IntegrationConditionReplicaSetReadyReason string = "ReplicaSetReady"
-       // IntegrationConditionReplicaSetNotReadyReason --
-       IntegrationConditionReplicaSetNotReadyReason string = 
"ReplicaSetNotReady"
+       // IntegrationConditionCronJobActiveReason --
+       IntegrationConditionCronJobActiveReason string = "CronJobActive"
+       // IntegrationConditionLastJobSucceededReason --
+       IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded"
+       // IntegrationConditionLastJobFailedReason --
+       IntegrationConditionLastJobFailedReason string = "LastJobFailed"
+       // IntegrationConditionErrorReason --
+       IntegrationConditionErrorReason string = "Error"
+
        // IntegrationConditionUnsupportedLanguageReason --
        IntegrationConditionUnsupportedLanguageReason string = 
"UnsupportedLanguage"
 
diff --git a/pkg/apis/camel/v1/integration_types_support.go 
b/pkg/apis/camel/v1/integration_types_support.go
index 345a1c9..0aff087 100644
--- a/pkg/apis/camel/v1/integration_types_support.go
+++ b/pkg/apis/camel/v1/integration_types_support.go
@@ -28,7 +28,6 @@ import (
 
 const IntegrationLabel = "camel.apache.org/integration"
 
-// NewIntegration --
 func NewIntegration(namespace string, name string) Integration {
        return Integration{
                TypeMeta: metav1.TypeMeta{
@@ -42,7 +41,6 @@ func NewIntegration(namespace string, name string) 
Integration {
        }
 }
 
-// NewIntegrationList --
 func NewIntegrationList() IntegrationList {
        return IntegrationList{
                TypeMeta: metav1.TypeMeta{
@@ -81,27 +79,22 @@ func (in *Integration) Resources() []ResourceSpec {
        return resources
 }
 
-// AddSource --
 func (in *IntegrationSpec) AddSource(name string, content string, language 
Language) {
        in.Sources = append(in.Sources, NewSourceSpec(name, content, language))
 }
 
-// AddSources --
 func (in *IntegrationSpec) AddSources(sources ...SourceSpec) {
        in.Sources = append(in.Sources, sources...)
 }
 
-// AddResources --
 func (in *IntegrationSpec) AddResources(resources ...ResourceSpec) {
        in.Resources = append(in.Resources, resources...)
 }
 
-// AddFlows --
 func (in *IntegrationSpec) AddFlows(flows ...Flow) {
        in.Flows = append(in.Flows, flows...)
 }
 
-// AddConfiguration --
 func (in *IntegrationSpec) AddConfiguration(confType string, confValue string) 
{
        in.Configuration = append(in.Configuration, ConfigurationSpec{
                Type:  confType,
@@ -121,7 +114,6 @@ func (in *IntegrationSpec) AddConfigurationAsResource(
        })
 }
 
-// AddDependency --
 func (in *IntegrationSpec) AddDependency(dependency string) {
        if in.Dependencies == nil {
                in.Dependencies = make([]string, 0)
@@ -162,7 +154,6 @@ func trimFirstLeadingSpace(val string) string {
        return val
 }
 
-// AddOrReplaceGeneratedResources --
 func (in *IntegrationStatus) AddOrReplaceGeneratedResources(resources 
...ResourceSpec) {
        newResources := make([]ResourceSpec, 0)
        for _, resource := range resources {
@@ -182,7 +173,6 @@ func (in *IntegrationStatus) 
AddOrReplaceGeneratedResources(resources ...Resourc
        in.GeneratedResources = append(in.GeneratedResources, newResources...)
 }
 
-// AddOrReplaceGeneratedSources --
 func (in *IntegrationStatus) AddOrReplaceGeneratedSources(sources 
...SourceSpec) {
        newSources := make([]SourceSpec, 0)
        for _, source := range sources {
@@ -202,7 +192,6 @@ func (in *IntegrationStatus) 
AddOrReplaceGeneratedSources(sources ...SourceSpec)
        in.GeneratedSources = append(in.GeneratedSources, newSources...)
 }
 
-// AddConfigurationsIfMissing --
 func (in *IntegrationStatus) AddConfigurationsIfMissing(configurations 
...ConfigurationSpec) {
        for _, config := range configurations {
                alreadyPresent := false
@@ -218,7 +207,6 @@ func (in *IntegrationStatus) 
AddConfigurationsIfMissing(configurations ...Config
        }
 }
 
-// Configurations --
 func (in *IntegrationSpec) Configurations() []ConfigurationSpec {
        if in == nil {
                return []ConfigurationSpec{}
@@ -227,7 +215,6 @@ func (in *IntegrationSpec) Configurations() 
[]ConfigurationSpec {
        return in.Configuration
 }
 
-// Configurations --
 func (in *IntegrationStatus) Configurations() []ConfigurationSpec {
        if in == nil {
                return []ConfigurationSpec{}
@@ -236,7 +223,6 @@ func (in *IntegrationStatus) Configurations() 
[]ConfigurationSpec {
        return in.Configuration
 }
 
-// Configurations --
 func (in *Integration) Configurations() []ConfigurationSpec {
        if in == nil {
                return []ConfigurationSpec{}
@@ -249,7 +235,6 @@ func (in *Integration) Configurations() []ConfigurationSpec 
{
        return answer
 }
 
-// NewSourceSpec --
 func NewSourceSpec(name string, content string, language Language) SourceSpec {
        return SourceSpec{
                DataSpec: DataSpec{
@@ -260,7 +245,6 @@ func NewSourceSpec(name string, content string, language 
Language) SourceSpec {
        }
 }
 
-// NewResourceSpec --
 func NewResourceSpec(name string, content string, destination string, 
resourceType ResourceType) ResourceSpec {
        return ResourceSpec{
                DataSpec: DataSpec{
@@ -284,7 +268,6 @@ func (in *SourceSpec) InferLanguage() Language {
        return ""
 }
 
-// SetIntegrationPlatform --
 func (in *Integration) SetIntegrationPlatform(platform *IntegrationPlatform) {
        cs := corev1.ConditionTrue
 
@@ -296,7 +279,6 @@ func (in *Integration) SetIntegrationPlatform(platform 
*IntegrationPlatform) {
        in.Status.Platform = platform.Name
 }
 
-// SetIntegrationKit --
 func (in *Integration) SetIntegrationKit(kit *IntegrationKit) {
        cs := corev1.ConditionTrue
        message := kit.Name
@@ -321,7 +303,6 @@ func (in *Integration) SetIntegrationKit(kit 
*IntegrationKit) {
        in.Status.Image = image
 }
 
-// GetIntegrationKitNamespace --
 func (in *Integration) GetIntegrationKitNamespace(p *IntegrationPlatform) 
string {
        if in.Status.IntegrationKit != nil && 
in.Status.IntegrationKit.Namespace != "" {
                return in.Status.IntegrationKit.Namespace
@@ -346,7 +327,6 @@ func (in *IntegrationStatus) GetCondition(condType 
IntegrationConditionType) *In
        return nil
 }
 
-// SetCondition --
 func (in *IntegrationStatus) SetCondition(condType IntegrationConditionType, 
status corev1.ConditionStatus, reason string, message string) {
        in.SetConditions(IntegrationCondition{
                Type:    condType,
@@ -356,7 +336,6 @@ func (in *IntegrationStatus) SetCondition(condType 
IntegrationConditionType, sta
        })
 }
 
-// SetErrorCondition --
 func (in *IntegrationStatus) SetErrorCondition(condType 
IntegrationConditionType, reason string, err error) {
        in.SetConditions(IntegrationCondition{
                Type:    condType,
@@ -376,7 +355,7 @@ func (in *IntegrationStatus) SetConditions(conditions 
...IntegrationCondition) {
        for _, condition := range conditions {
                currentCond := in.GetCondition(condition.Type)
 
-               if currentCond != nil && currentCond.Status == condition.Status 
&& currentCond.Reason == condition.Reason {
+               if currentCond != nil && currentCond.Status == condition.Status 
&& currentCond.Reason == condition.Reason && currentCond.Message == 
condition.Message {
                        return
                }
 
@@ -423,7 +402,6 @@ func (in *IntegrationStatus) RemoveCondition(condType 
IntegrationConditionType)
 
 var _ ResourceCondition = IntegrationCondition{}
 
-// GetConditions --
 func (in *IntegrationStatus) GetConditions() []ResourceCondition {
        res := make([]ResourceCondition, 0, len(in.Conditions))
        for _, c := range in.Conditions {
@@ -432,32 +410,26 @@ func (in *IntegrationStatus) GetConditions() 
[]ResourceCondition {
        return res
 }
 
-// GetType --
 func (c IntegrationCondition) GetType() string {
        return string(c.Type)
 }
 
-// GetStatus --
 func (c IntegrationCondition) GetStatus() corev1.ConditionStatus {
        return c.Status
 }
 
-// GetLastUpdateTime --
 func (c IntegrationCondition) GetLastUpdateTime() metav1.Time {
        return c.LastUpdateTime
 }
 
-// GetLastTransitionTime --
 func (c IntegrationCondition) GetLastTransitionTime() metav1.Time {
        return c.LastTransitionTime
 }
 
-// GetReason --
 func (c IntegrationCondition) GetReason() string {
        return c.Reason
 }
 
-// GetMessage --
 func (c IntegrationCondition) GetMessage() string {
        return c.Message
 }
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 2013142..b9c98c6 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -27,6 +27,9 @@ import (
        "strconv"
        "time"
 
+       appsv1 "k8s.io/api/apps/v1"
+       batchv1 "k8s.io/api/batch/v1"
+       batchv1beta1 "k8s.io/api/batch/v1beta1"
        coordination "k8s.io/api/coordination/v1"
        corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,6 +47,8 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/manager"
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
 
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
        "github.com/apache/camel-k/pkg/apis"
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/client"
@@ -133,8 +138,9 @@ func Run(healthPort, monitoringPort int32, leaderElection 
bool) {
                log.Info("Leader election is disabled!")
        }
 
-       podLabelSelector, err := labels.NewRequirement(v1.IntegrationLabel, 
selection.Exists, []string{})
-       exitOnError(err, "cannot create Pod labels selector")
+       hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, 
selection.Exists, []string{})
+       exitOnError(err, "cannot create Integration label selector")
+       selector := labels.NewSelector().Add(*hasIntegrationLabel)
 
        mgr, err := manager.New(c.GetConfig(), manager.Options{
                Namespace:                     watchNamespace,
@@ -149,9 +155,11 @@ func Run(healthPort, monitoringPort int32, leaderElection 
bool) {
                NewCache: cache.BuilderWithOptions(
                        cache.Options{
                                SelectorsByObject: cache.SelectorsByObject{
-                                       &corev1.Pod{}: {
-                                               Label: 
labels.NewSelector().Add(*podLabelSelector),
-                                       },
+                                       &corev1.Pod{}:           {Label: 
selector},
+                                       &appsv1.Deployment{}:    {Label: 
selector},
+                                       &batchv1beta1.CronJob{}: {Label: 
selector},
+                                       &batchv1.Job{}:          {Label: 
selector},
+                                       &servingv1.Service{}:    {Label: 
selector},
                                },
                        },
                ),
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 962a62c..3872828 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -20,6 +20,8 @@ package integration
 import (
        "context"
 
+       appsv1 "k8s.io/api/apps/v1"
+       batchv1beta1 "k8s.io/api/batch/v1beta1"
        corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
@@ -36,6 +38,8 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "sigs.k8s.io/controller-runtime/pkg/source"
 
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/client"
        camelevent "github.com/apache/camel-k/pkg/event"
@@ -168,6 +172,12 @@ func add(mgr manager.Manager, r reconcile.Reconciler) 
error {
 
                                return requests
                        })).
+               // Watch for the owned Deployments
+               Owns(&appsv1.Deployment{}).
+               // Watch for the owned Knative Services
+               Owns(&servingv1.Service{}).
+               // Watch for the owned CronJobs
+               Owns(&batchv1beta1.CronJob{}).
                // Watch for the Integration Pods
                Watches(&source.Kind{Type: &corev1.Pod{}},
                        handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index 33911ae..320ef69 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,17 +19,21 @@ package integration
 
 import (
        "context"
+       "fmt"
        "strconv"
 
-       "github.com/pkg/errors"
-
        appsv1 "k8s.io/api/apps/v1"
+       batchv1 "k8s.io/api/batch/v1"
+       batchv1beta1 "k8s.io/api/batch/v1beta1"
        corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
 
        ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/trait"
        "github.com/apache/camel-k/pkg/util/digest"
@@ -57,7 +61,7 @@ func (action *monitorAction) CanHandle(integration 
*v1.Integration) bool {
 func (action *monitorAction) Handle(ctx context.Context, integration 
*v1.Integration) (*v1.Integration, error) {
        // At that staged the Integration must have a Kit
        if integration.Status.IntegrationKit == nil {
-               return nil, errors.Errorf("no kit set on integration %s", 
integration.Name)
+               return nil, fmt.Errorf("no kit set on integration %s", 
integration.Name)
        }
 
        // Check if the Integration requires a rebuild
@@ -77,7 +81,7 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
 
        kit, err := kubernetes.GetIntegrationKit(ctx, action.client, 
integration.Status.IntegrationKit.Name, 
integration.Status.IntegrationKit.Namespace)
        if err != nil {
-               return nil, errors.Wrapf(err, "unable to find integration kit 
%s/%s, %s", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
+               return nil, fmt.Errorf("unable to find integration kit %s/%s, 
%s", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
        }
 
        // Check if an IntegrationKit with higher priority is ready
@@ -146,10 +150,12 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
                integration.Status.Phase = v1.IntegrationPhaseRunning
        }
 
-       // Mirror ready condition from the owned resource (e.g., Deployment, 
CronJob, KnativeService ...)
-       // into the owning integration
        previous := 
integration.Status.GetCondition(v1.IntegrationConditionReady)
-       kubernetes.MirrorReadyCondition(ctx, action.client, integration)
+
+       err = action.updateIntegrationPhaseAndReadyCondition(ctx, integration, 
pendingPods.Items, runningPods.Items)
+       if err != nil {
+               return nil, err
+       }
 
        if next := 
integration.Status.GetCondition(v1.IntegrationConditionReady); (previous == nil 
|| previous.FirstTruthyTime == nil || previous.FirstTruthyTime.IsZero()) &&
                next != nil && next.Status == corev1.ConditionTrue && 
!(next.FirstTruthyTime == nil || next.FirstTruthyTime.IsZero()) {
@@ -159,69 +165,178 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
                timeToFirstReadiness.Observe(duration.Seconds())
        }
 
-       // the integration pod may be in running phase, but the corresponding 
container running the integration code
-       // may be in error state, in this case we should check the deployment 
status and set the integration status accordingly.
-       if kubernetes.IsConditionTrue(integration, 
v1.IntegrationConditionDeploymentAvailable) {
-               deployment, err := kubernetes.GetDeployment(ctx, action.client, 
integration.Name, integration.Namespace)
-               if err != nil {
-                       return nil, err
-               }
+       return integration, nil
+}
 
-               switch integration.Status.Phase {
-               case v1.IntegrationPhaseRunning:
-                       deployUnavailable := false
-                       progressingFailing := false
-                       for _, c := range deployment.Status.Conditions {
-                               // first, check if the container status is not 
available
-                               if c.Type == appsv1.DeploymentAvailable {
-                                       deployUnavailable = c.Status == 
corev1.ConditionFalse
-                               }
-                               // second, check when it is progressing and 
reason is the replicas are available but the number of replicas are zero
-                               // in this case, the container integration is 
failing
-                               if c.Type == appsv1.DeploymentProgressing {
-                                       progressingFailing = c.Status == 
corev1.ConditionTrue && c.Reason == "NewReplicaSetAvailable" && 
deployment.Status.AvailableReplicas < 1
-                               }
+func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx 
context.Context, integration *v1.Integration, pendingPods []corev1.Pod, 
runningPods []corev1.Pod) error {
+       var controller ctrl.Object
+       var lastCompletedJob *batchv1.Job
+
+       if isConditionTrue(integration, 
v1.IntegrationConditionDeploymentAvailable) {
+               controller = &appsv1.Deployment{}
+       } else if isConditionTrue(integration, 
v1.IntegrationConditionKnativeServiceAvailable) {
+               controller = &servingv1.Service{}
+       } else if isConditionTrue(integration, 
v1.IntegrationConditionCronJobAvailable) {
+               controller = &batchv1beta1.CronJob{}
+       } else {
+               return fmt.Errorf("unsupported controller for integration %s", 
integration.Name)
+       }
+
+       switch c := controller.(type) {
+       case *appsv1.Deployment:
+               // Check the Deployment exists
+               if err := action.client.Get(ctx, 
ctrl.ObjectKeyFromObject(integration), c); err != nil {
+                       if errors.IsNotFound(err) {
+                               integration.Status.Phase = 
v1.IntegrationPhaseError
+                               setReadyConditionError(integration, err.Error())
+                               return nil
+                       } else {
+                               return err
                        }
-                       if deployUnavailable && progressingFailing {
-                               notAvailableCondition := 
v1.IntegrationCondition{
-                                       Type:    v1.IntegrationConditionReady,
-                                       Status:  corev1.ConditionFalse,
-                                       Reason:  
v1.IntegrationConditionErrorReason,
-                                       Message: "The corresponding pod(s) may 
be in error state, look at the pod status or log for errors",
-                               }
-                               
integration.Status.SetConditions(notAvailableCondition)
+               }
+               // Check the Deployment progression
+               if progressing := kubernetes.GetDeploymentCondition(*c, 
appsv1.DeploymentProgressing); progressing != nil && progressing.Status == 
corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" {
+                       integration.Status.Phase = v1.IntegrationPhaseError
+                       setReadyConditionError(integration, progressing.Message)
+                       return nil
+               }
+
+       case *servingv1.Service:
+               // Check the KnativeService exists
+               if err := action.client.Get(ctx, 
ctrl.ObjectKeyFromObject(integration), c); err != nil {
+                       if errors.IsNotFound(err) {
                                integration.Status.Phase = 
v1.IntegrationPhaseError
-                               return integration, nil
+                               setReadyConditionError(integration, err.Error())
+                               return nil
+                       } else {
+                               return err
                        }
+               }
+               // Check the KnativeService conditions
+               if ready := kubernetes.GetKnativeServiceCondition(*c, 
servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == 
"RevisionFailed" {
+                       integration.Status.Phase = v1.IntegrationPhaseError
+                       setReadyConditionError(integration, ready.Message)
+                       return nil
+               }
 
-               case v1.IntegrationPhaseError:
-                       // if the integration is in error phase, check if the 
corresponding pod is running ok, the user may have updated the integration.
-                       deployAvailable := false
-                       progressingOk := false
-                       for _, c := range deployment.Status.Conditions {
-                               // first, check if the container is in 
available state
-                               if c.Type == appsv1.DeploymentAvailable {
-                                       deployAvailable = c.Status == 
corev1.ConditionTrue
-                               }
-                               // second, check the progressing and the reasons
-                               if c.Type == appsv1.DeploymentProgressing {
-                                       progressingOk = c.Status == 
corev1.ConditionTrue && (c.Reason == "NewReplicaSetAvailable" || c.Reason == 
"ReplicaSetUpdated")
+       case *batchv1beta1.CronJob:
+               // Check the CronJob exists
+               if err := action.client.Get(ctx, 
ctrl.ObjectKeyFromObject(integration), c); err != nil {
+                       if errors.IsNotFound(err) {
+                               integration.Status.Phase = 
v1.IntegrationPhaseError
+                               setReadyConditionError(integration, err.Error())
+                               return nil
+                       } else {
+                               return err
+                       }
+               }
+               // Check latest job result
+               if lastScheduleTime := c.Status.LastScheduleTime; 
lastScheduleTime != nil && len(c.Status.Active) == 0 {
+                       jobs := batchv1.JobList{}
+                       if err := action.client.List(ctx, &jobs,
+                               ctrl.InNamespace(integration.Namespace),
+                               ctrl.MatchingLabels{v1.IntegrationLabel: 
integration.Name},
+                       ); err != nil {
+                               return err
+                       }
+                       t := lastScheduleTime.Time
+                       for i, job := range jobs.Items {
+                               if job.Status.Active == 0 && 
job.CreationTimestamp.Time.Before(t) {
+                                       continue
                                }
+                               lastCompletedJob = &jobs.Items[i]
+                               t = lastCompletedJob.CreationTimestamp.Time
                        }
-                       if deployAvailable && progressingOk {
-                               availableCondition := v1.IntegrationCondition{
-                                       Type:   v1.IntegrationConditionReady,
-                                       Status: corev1.ConditionTrue,
-                                       Reason: 
v1.IntegrationConditionReplicaSetReadyReason,
+                       if lastCompletedJob != nil {
+                               if failed := 
kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil 
&& failed.Status == corev1.ConditionTrue {
+                                       setReadyCondition(integration, 
corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, 
fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message))
+                                       integration.Status.Phase = 
v1.IntegrationPhaseError
+                                       return nil
                                }
-                               
integration.Status.SetConditions(availableCondition)
-                               integration.Status.Phase = 
v1.IntegrationPhaseRunning
-                               return integration, nil
                        }
                }
        }
 
-       return integration, nil
+       // Check Pods statuses
+       for _, pod := range pendingPods {
+               // Check the scheduled condition
+               if scheduled := kubernetes.GetPodCondition(pod, 
corev1.PodScheduled); scheduled != nil && scheduled.Status == 
corev1.ConditionFalse && scheduled.Reason == "Unschedulable" {
+                       integration.Status.Phase = v1.IntegrationPhaseError
+                       setReadyConditionError(integration, scheduled.Message)
+                       return nil
+               }
+       }
+       // Check pending container statuses
+       for _, pod := range pendingPods {
+               containers := append(pod.Status.InitContainerStatuses, 
pod.Status.ContainerStatuses...)
+               for _, container := range containers {
+                       // Check the images are pulled
+                       if waiting := container.State.Waiting; waiting != nil 
&& waiting.Reason == "ImagePullBackOff" {
+                               integration.Status.Phase = 
v1.IntegrationPhaseError
+                               setReadyConditionError(integration, 
waiting.Message)
+                               return nil
+                       }
+               }
+       }
+       // Check running container statuses
+       for _, pod := range runningPods {
+               containers := append(pod.Status.InitContainerStatuses, 
pod.Status.ContainerStatuses...)
+               for _, container := range containers {
+                       // Check the container state
+                       if waiting := container.State.Waiting; waiting != nil 
&& waiting.Reason == "CrashLoopBackOff" {
+                               integration.Status.Phase = 
v1.IntegrationPhaseError
+                               setReadyConditionError(integration, 
waiting.Message)
+                               return nil
+                       }
+                       if terminated := container.State.Terminated; terminated 
!= nil && terminated.Reason == "Error" {
+                               integration.Status.Phase = 
v1.IntegrationPhaseError
+                               setReadyConditionError(integration, 
terminated.Message)
+                               return nil
+                       }
+               }
+       }
+
+       integration.Status.Phase = v1.IntegrationPhaseRunning
+
+       switch c := controller.(type) {
+       case *appsv1.Deployment:
+               replicas := int32(1)
+               if r := integration.Spec.Replicas; r != nil {
+                       replicas = *r
+               }
+               if c.Status.UpdatedReplicas == replicas && 
c.Status.ReadyReplicas == replicas {
+                       setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready 
replicas", c.Status.ReadyReplicas, replicas))
+               } else if c.Status.UpdatedReplicas < replicas {
+                       setReadyCondition(integration, corev1.ConditionFalse, 
v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated 
replicas", c.Status.UpdatedReplicas, replicas))
+               } else {
+                       setReadyCondition(integration, corev1.ConditionFalse, 
v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready 
replicas", c.Status.ReadyReplicas, replicas))
+               }
+
+       case *servingv1.Service:
+               ready := kubernetes.GetKnativeServiceCondition(*c, 
servingv1.ServiceConditionReady)
+               if ready.IsTrue() {
+                       setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionKnativeServiceReadyReason, "")
+               } else {
+                       setReadyCondition(integration, corev1.ConditionFalse, 
ready.GetReason(), ready.GetMessage())
+               }
+
+       case *batchv1beta1.CronJob:
+               if c.Status.LastScheduleTime == nil {
+                       setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
+               } else if len(c.Status.Active) > 0 {
+                       setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobActiveReason, "cronjob active")
+               } else if c.Spec.SuccessfulJobsHistoryLimit != nil && 
*c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil 
&& *c.Spec.FailedJobsHistoryLimit == 0 {
+                       setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
+               } else if lastCompletedJob != nil {
+                       if complete := 
kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != 
nil && complete.Status == corev1.ConditionTrue {
+                               setReadyCondition(integration, 
corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, 
fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
+                       }
+               } else {
+                       
integration.Status.SetCondition(v1.IntegrationConditionReady, 
corev1.ConditionUnknown, "", "")
+               }
+       }
+
+       return nil
 }
 
 func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit, error) {
@@ -245,3 +360,19 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit,
        }
        return kit, nil
 }
+
+func isConditionTrue(integration *v1.Integration, conditionType 
v1.IntegrationConditionType) bool {
+       cond := integration.Status.GetCondition(conditionType)
+       if cond == nil {
+               return false
+       }
+       return cond.Status == corev1.ConditionTrue
+}
+
+func setReadyConditionError(integration *v1.Integration, err string) {
+       setReadyCondition(integration, corev1.ConditionFalse, 
v1.IntegrationConditionErrorReason, err)
+}
+
+func setReadyCondition(integration *v1.Integration, status 
corev1.ConditionStatus, reason string, message string) {
+       integration.Status.SetCondition(v1.IntegrationConditionReady, status, 
reason, message)
+}
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 9c1d7a6..d851ff8 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -371,9 +371,9 @@ var assets = func() http.FileSystem {
                "/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{
                        name:             "operator-role.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 2349,
+                       uncompressedSize: 2311,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\x77\x93\xd6\xe8\xc2\x06\x22\x6f\x17\x7b\xa4\xa8\xb1\x3c\x0d\xc5\x61\x49\x2a\x8a\xfb\xf5\x05\x29\x7b\xd7\x59\x27\x40\x0e\x41\x5b\x5d\x3c\xa4\x46\x6f\xde\x9b\x79\x26\x4b\x4c\x5f\xef\x29\x4a\x7c\x60\x4d\x36\x50\x83\x28\x88\x7b\xc2\xd2\x29\xbd\x27\x54\xb2\x8b\x83\xf2\x84\x1b\xe9\x6d\xa3\x22\x8b\xc5\x9b\x65\x75\xf3\x16\xbd\x6d\xc8\x43\x2c\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x
 [...]
                },
                "/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰FileInfo{
                        name:    "patch-role-to-clusterrole.yaml",
diff --git a/pkg/util/kubernetes/conditions.go 
b/pkg/util/kubernetes/conditions.go
index 569bf43..f27ef61 100644
--- a/pkg/util/kubernetes/conditions.go
+++ b/pkg/util/kubernetes/conditions.go
@@ -18,113 +18,50 @@ limitations under the License.
 package kubernetes
 
 import (
-       "context"
-       "errors"
-       "fmt"
-       "strconv"
-
-       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
-       "github.com/apache/camel-k/pkg/client"
        appsv1 "k8s.io/api/apps/v1"
-       "k8s.io/api/batch/v1beta1"
+       batchv1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
-       runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
+
+       knative "knative.dev/pkg/apis"
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
 )
 
-// nolint: gocritic
-func MirrorReadyCondition(ctx context.Context, c client.Client, it 
*v1.Integration) {
-       if IsConditionTrue(it, v1.IntegrationConditionDeploymentAvailable) || 
IsConditionTrue(it, v1.IntegrationConditionKnativeServiceAvailable) {
-               mirrorReadyConditionFromReplicaSet(ctx, c, it)
-       } else if IsConditionTrue(it, v1.IntegrationConditionCronJobAvailable) {
-               mirrorReadyConditionFromCronJob(ctx, c, it)
-       } else {
-               it.Status.SetCondition(
-                       v1.IntegrationConditionReady,
-                       corev1.ConditionUnknown,
-                       "",
-                       "",
-               )
+func GetPodCondition(pod corev1.Pod, conditionType corev1.PodConditionType) 
*corev1.PodCondition {
+       for i := range pod.Status.Conditions {
+               condition := pod.Status.Conditions[i]
+               if condition.Type == conditionType {
+                       return &condition
+               }
        }
+       return nil
 }
 
-func mirrorReadyConditionFromReplicaSet(ctx context.Context, c client.Client, 
it *v1.Integration) {
-       list := appsv1.ReplicaSetList{}
-       opts := runtimeclient.MatchingLabels{
-               v1.IntegrationLabel: it.Name,
-       }
-       if err := c.List(ctx, &list, opts, 
runtimeclient.InNamespace(it.Namespace)); err != nil {
-               setReadyConditionError(it, err)
-               return
-       }
-
-       if len(list.Items) == 0 {
-               setReadyConditionError(it, errors.New("replicaset not found"))
-               return
-       }
-
-       var rs *appsv1.ReplicaSet
-       for _, r := range list.Items {
-               r := r
-               if r.Labels["camel.apache.org/generation"] == 
strconv.FormatInt(it.Generation, 10) {
-                       rs = &r
+func GetDeploymentCondition(deployment appsv1.Deployment, conditionType 
appsv1.DeploymentConditionType) *appsv1.DeploymentCondition {
+       for i := range deployment.Status.Conditions {
+               condition := deployment.Status.Conditions[i]
+               if condition.Type == conditionType {
+                       return &condition
                }
        }
-       if rs == nil {
-               rs = &list.Items[0]
-       }
-       var replicas int32 = 1
-       if rs.Spec.Replicas != nil {
-               replicas = *rs.Spec.Replicas
-       }
-       // The Integration is considered ready when the number of replicas
-       // reported to be ready is larger or equal to the specified number
-       // of replicas. This avoid reporting a falsy readiness condition
-       // when the Integration is being down-scaled.
-       if replicas <= rs.Status.ReadyReplicas {
-               it.Status.SetCondition(
-                       v1.IntegrationConditionReady,
-                       corev1.ConditionTrue,
-                       v1.IntegrationConditionReplicaSetReadyReason,
-                       "",
-               )
-       } else {
-               it.Status.SetCondition(
-                       v1.IntegrationConditionReady,
-                       corev1.ConditionFalse,
-                       v1.IntegrationConditionReplicaSetNotReadyReason,
-                       "",
-               )
-       }
+       return nil
 }
 
-func mirrorReadyConditionFromCronJob(ctx context.Context, c client.Client, it 
*v1.Integration) {
-       cronJob := v1beta1.CronJob{}
-       if err := c.Get(ctx, runtimeclient.ObjectKey{Namespace: it.Namespace, 
Name: it.Name}, &cronJob); err != nil {
-               setReadyConditionError(it, err)
-       } else {
-               // CronJob status is not tracked by Kubernetes
-               it.Status.SetCondition(
-                       v1.IntegrationConditionReady,
-                       corev1.ConditionTrue,
-                       v1.IntegrationConditionCronJobCreatedReason,
-                       "",
-               )
+func GetKnativeServiceCondition(service servingv1.Service, conditionType 
knative.ConditionType) *knative.Condition {
+       for i := range service.Status.Conditions {
+               condition := service.Status.Conditions[i]
+               if condition.Type == conditionType {
+                       return &condition
+               }
        }
+       return nil
 }
 
-func IsConditionTrue(it *v1.Integration, conditionType 
v1.IntegrationConditionType) bool {
-       cond := it.Status.GetCondition(conditionType)
-       if cond == nil {
-               return false
+func GetJobCondition(job batchv1.Job, conditionType batchv1.JobConditionType) 
*batchv1.JobCondition {
+       for i := range job.Status.Conditions {
+               condition := job.Status.Conditions[i]
+               if condition.Type == conditionType {
+                       return &condition
+               }
        }
-       return cond.Status == corev1.ConditionTrue
-}
-
-func setReadyConditionError(it *v1.Integration, err error) {
-       it.Status.SetCondition(
-               v1.IntegrationConditionReady,
-               corev1.ConditionUnknown,
-               v1.IntegrationConditionErrorReason,
-               fmt.Sprintf("%v", err),
-       )
+       return nil
 }

Reply via email to