This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 38122e26eb0af82edb8ab37805a531d1af797217 Author: Antonin Stefanutti <anto...@stefanutti.fr> AuthorDate: Thu Oct 7 15:32:39 2021 +0200 feat: Comprehensive integration error status --- config/rbac/operator-role.yaml | 6 +- pkg/apis/camel/v1/integration_types.go | 22 +- pkg/apis/camel/v1/integration_types_support.go | 30 +-- pkg/cmd/operator/operator.go | 18 +- .../integration/integration_controller.go | 10 + pkg/controller/integration/monitor.go | 247 ++++++++++++++++----- pkg/resources/resources.go | 4 +- pkg/util/kubernetes/conditions.go | 125 +++-------- 8 files changed, 264 insertions(+), 198 deletions(-) diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml index fda70ff..e619bbf 100644 --- a/config/rbac/operator-role.yaml +++ b/config/rbac/operator-role.yaml @@ -83,8 +83,6 @@ rules: - apps resources: - deployments - - replicasets - - statefulsets verbs: - create - delete @@ -108,9 +106,9 @@ rules: - update - watch - apiGroups: - - apps + - batch resources: - - daemonsets + - jobs verbs: - get - list diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go index 3a6ea29..b30b66b 100644 --- a/pkg/apis/camel/v1/integration_types.go +++ b/pkg/apis/camel/v1/integration_types.go @@ -182,14 +182,24 @@ const ( IntegrationConditionJolokiaAvailableReason string = "JolokiaAvailable" // IntegrationConditionProbesAvailableReason -- IntegrationConditionProbesAvailableReason string = "ProbesAvailable" - // IntegrationConditionErrorReason -- - IntegrationConditionErrorReason string = "Error" + + // IntegrationConditionKnativeServiceReadyReason -- + IntegrationConditionKnativeServiceReadyReason string = "KnativeServiceReady" + // IntegrationConditionDeploymentReadyReason -- + IntegrationConditionDeploymentReadyReason string = "DeploymentReady" + // IntegrationConditionDeploymentProgressingReason -- + IntegrationConditionDeploymentProgressingReason string = "DeploymentProgressing" // IntegrationConditionCronJobCreatedReason -- IntegrationConditionCronJobCreatedReason string = "CronJobCreated" - // IntegrationConditionReplicaSetReadyReason -- - IntegrationConditionReplicaSetReadyReason string = "ReplicaSetReady" - // IntegrationConditionReplicaSetNotReadyReason -- - IntegrationConditionReplicaSetNotReadyReason string = "ReplicaSetNotReady" + // IntegrationConditionCronJobActiveReason -- + IntegrationConditionCronJobActiveReason string = "CronJobActive" + // IntegrationConditionLastJobSucceededReason -- + IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded" + // IntegrationConditionLastJobFailedReason -- + IntegrationConditionLastJobFailedReason string = "LastJobFailed" + // IntegrationConditionErrorReason -- + IntegrationConditionErrorReason string = "Error" + // IntegrationConditionUnsupportedLanguageReason -- IntegrationConditionUnsupportedLanguageReason string = "UnsupportedLanguage" diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index 345a1c9..0aff087 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -28,7 +28,6 @@ import ( const IntegrationLabel = "camel.apache.org/integration" -// NewIntegration -- func NewIntegration(namespace string, name string) Integration { return Integration{ TypeMeta: metav1.TypeMeta{ @@ -42,7 +41,6 @@ func NewIntegration(namespace string, name string) Integration { } } -// NewIntegrationList -- func NewIntegrationList() IntegrationList { return IntegrationList{ TypeMeta: metav1.TypeMeta{ @@ -81,27 +79,22 @@ func (in *Integration) Resources() []ResourceSpec { return resources } -// AddSource -- func (in *IntegrationSpec) AddSource(name string, content string, language Language) { in.Sources = append(in.Sources, NewSourceSpec(name, content, language)) } -// AddSources -- func (in *IntegrationSpec) AddSources(sources ...SourceSpec) { in.Sources = append(in.Sources, sources...) } -// AddResources -- func (in *IntegrationSpec) AddResources(resources ...ResourceSpec) { in.Resources = append(in.Resources, resources...) } -// AddFlows -- func (in *IntegrationSpec) AddFlows(flows ...Flow) { in.Flows = append(in.Flows, flows...) } -// AddConfiguration -- func (in *IntegrationSpec) AddConfiguration(confType string, confValue string) { in.Configuration = append(in.Configuration, ConfigurationSpec{ Type: confType, @@ -121,7 +114,6 @@ func (in *IntegrationSpec) AddConfigurationAsResource( }) } -// AddDependency -- func (in *IntegrationSpec) AddDependency(dependency string) { if in.Dependencies == nil { in.Dependencies = make([]string, 0) @@ -162,7 +154,6 @@ func trimFirstLeadingSpace(val string) string { return val } -// AddOrReplaceGeneratedResources -- func (in *IntegrationStatus) AddOrReplaceGeneratedResources(resources ...ResourceSpec) { newResources := make([]ResourceSpec, 0) for _, resource := range resources { @@ -182,7 +173,6 @@ func (in *IntegrationStatus) AddOrReplaceGeneratedResources(resources ...Resourc in.GeneratedResources = append(in.GeneratedResources, newResources...) } -// AddOrReplaceGeneratedSources -- func (in *IntegrationStatus) AddOrReplaceGeneratedSources(sources ...SourceSpec) { newSources := make([]SourceSpec, 0) for _, source := range sources { @@ -202,7 +192,6 @@ func (in *IntegrationStatus) AddOrReplaceGeneratedSources(sources ...SourceSpec) in.GeneratedSources = append(in.GeneratedSources, newSources...) } -// AddConfigurationsIfMissing -- func (in *IntegrationStatus) AddConfigurationsIfMissing(configurations ...ConfigurationSpec) { for _, config := range configurations { alreadyPresent := false @@ -218,7 +207,6 @@ func (in *IntegrationStatus) AddConfigurationsIfMissing(configurations ...Config } } -// Configurations -- func (in *IntegrationSpec) Configurations() []ConfigurationSpec { if in == nil { return []ConfigurationSpec{} @@ -227,7 +215,6 @@ func (in *IntegrationSpec) Configurations() []ConfigurationSpec { return in.Configuration } -// Configurations -- func (in *IntegrationStatus) Configurations() []ConfigurationSpec { if in == nil { return []ConfigurationSpec{} @@ -236,7 +223,6 @@ func (in *IntegrationStatus) Configurations() []ConfigurationSpec { return in.Configuration } -// Configurations -- func (in *Integration) Configurations() []ConfigurationSpec { if in == nil { return []ConfigurationSpec{} @@ -249,7 +235,6 @@ func (in *Integration) Configurations() []ConfigurationSpec { return answer } -// NewSourceSpec -- func NewSourceSpec(name string, content string, language Language) SourceSpec { return SourceSpec{ DataSpec: DataSpec{ @@ -260,7 +245,6 @@ func NewSourceSpec(name string, content string, language Language) SourceSpec { } } -// NewResourceSpec -- func NewResourceSpec(name string, content string, destination string, resourceType ResourceType) ResourceSpec { return ResourceSpec{ DataSpec: DataSpec{ @@ -284,7 +268,6 @@ func (in *SourceSpec) InferLanguage() Language { return "" } -// SetIntegrationPlatform -- func (in *Integration) SetIntegrationPlatform(platform *IntegrationPlatform) { cs := corev1.ConditionTrue @@ -296,7 +279,6 @@ func (in *Integration) SetIntegrationPlatform(platform *IntegrationPlatform) { in.Status.Platform = platform.Name } -// SetIntegrationKit -- func (in *Integration) SetIntegrationKit(kit *IntegrationKit) { cs := corev1.ConditionTrue message := kit.Name @@ -321,7 +303,6 @@ func (in *Integration) SetIntegrationKit(kit *IntegrationKit) { in.Status.Image = image } -// GetIntegrationKitNamespace -- func (in *Integration) GetIntegrationKitNamespace(p *IntegrationPlatform) string { if in.Status.IntegrationKit != nil && in.Status.IntegrationKit.Namespace != "" { return in.Status.IntegrationKit.Namespace @@ -346,7 +327,6 @@ func (in *IntegrationStatus) GetCondition(condType IntegrationConditionType) *In return nil } -// SetCondition -- func (in *IntegrationStatus) SetCondition(condType IntegrationConditionType, status corev1.ConditionStatus, reason string, message string) { in.SetConditions(IntegrationCondition{ Type: condType, @@ -356,7 +336,6 @@ func (in *IntegrationStatus) SetCondition(condType IntegrationConditionType, sta }) } -// SetErrorCondition -- func (in *IntegrationStatus) SetErrorCondition(condType IntegrationConditionType, reason string, err error) { in.SetConditions(IntegrationCondition{ Type: condType, @@ -376,7 +355,7 @@ func (in *IntegrationStatus) SetConditions(conditions ...IntegrationCondition) { for _, condition := range conditions { currentCond := in.GetCondition(condition.Type) - if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason { + if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason && currentCond.Message == condition.Message { return } @@ -423,7 +402,6 @@ func (in *IntegrationStatus) RemoveCondition(condType IntegrationConditionType) var _ ResourceCondition = IntegrationCondition{} -// GetConditions -- func (in *IntegrationStatus) GetConditions() []ResourceCondition { res := make([]ResourceCondition, 0, len(in.Conditions)) for _, c := range in.Conditions { @@ -432,32 +410,26 @@ func (in *IntegrationStatus) GetConditions() []ResourceCondition { return res } -// GetType -- func (c IntegrationCondition) GetType() string { return string(c.Type) } -// GetStatus -- func (c IntegrationCondition) GetStatus() corev1.ConditionStatus { return c.Status } -// GetLastUpdateTime -- func (c IntegrationCondition) GetLastUpdateTime() metav1.Time { return c.LastUpdateTime } -// GetLastTransitionTime -- func (c IntegrationCondition) GetLastTransitionTime() metav1.Time { return c.LastTransitionTime } -// GetReason -- func (c IntegrationCondition) GetReason() string { return c.Reason } -// GetMessage -- func (c IntegrationCondition) GetMessage() string { return c.Message } diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 2013142..b9c98c6 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -27,6 +27,9 @@ import ( "strconv" "time" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" coordination "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -44,6 +47,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager/signals" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "github.com/apache/camel-k/pkg/apis" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/client" @@ -133,8 +138,9 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { log.Info("Leader election is disabled!") } - podLabelSelector, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{}) - exitOnError(err, "cannot create Pod labels selector") + hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{}) + exitOnError(err, "cannot create Integration label selector") + selector := labels.NewSelector().Add(*hasIntegrationLabel) mgr, err := manager.New(c.GetConfig(), manager.Options{ Namespace: watchNamespace, @@ -149,9 +155,11 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { NewCache: cache.BuilderWithOptions( cache.Options{ SelectorsByObject: cache.SelectorsByObject{ - &corev1.Pod{}: { - Label: labels.NewSelector().Add(*podLabelSelector), - }, + &corev1.Pod{}: {Label: selector}, + &appsv1.Deployment{}: {Label: selector}, + &batchv1beta1.CronJob{}: {Label: selector}, + &batchv1.Job{}: {Label: selector}, + &servingv1.Service{}: {Label: selector}, }, }, ), diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 962a62c..3872828 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -20,6 +20,8 @@ package integration import ( "context" + appsv1 "k8s.io/api/apps/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -36,6 +38,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/client" camelevent "github.com/apache/camel-k/pkg/event" @@ -168,6 +172,12 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return requests })). + // Watch for the owned Deployments + Owns(&appsv1.Deployment{}). + // Watch for the owned Knative Services + Owns(&servingv1.Service{}). + // Watch for the owned CronJobs + Owns(&batchv1beta1.CronJob{}). // Watch for the Integration Pods Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 33911ae..320ef69 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -19,17 +19,21 @@ package integration import ( "context" + "fmt" "strconv" - "github.com/pkg/errors" - appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" ctrl "sigs.k8s.io/controller-runtime/pkg/client" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/util/digest" @@ -57,7 +61,7 @@ func (action *monitorAction) CanHandle(integration *v1.Integration) bool { func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { // At that staged the Integration must have a Kit if integration.Status.IntegrationKit == nil { - return nil, errors.Errorf("no kit set on integration %s", integration.Name) + return nil, fmt.Errorf("no kit set on integration %s", integration.Name) } // Check if the Integration requires a rebuild @@ -77,7 +81,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra kit, err := kubernetes.GetIntegrationKit(ctx, action.client, integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace) if err != nil { - return nil, errors.Wrapf(err, "unable to find integration kit %s/%s, %s", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) + return nil, fmt.Errorf("unable to find integration kit %s/%s, %s", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) } // Check if an IntegrationKit with higher priority is ready @@ -146,10 +150,12 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra integration.Status.Phase = v1.IntegrationPhaseRunning } - // Mirror ready condition from the owned resource (e.g., Deployment, CronJob, KnativeService ...) - // into the owning integration previous := integration.Status.GetCondition(v1.IntegrationConditionReady) - kubernetes.MirrorReadyCondition(ctx, action.client, integration) + + err = action.updateIntegrationPhaseAndReadyCondition(ctx, integration, pendingPods.Items, runningPods.Items) + if err != nil { + return nil, err + } if next := integration.Status.GetCondition(v1.IntegrationConditionReady); (previous == nil || previous.FirstTruthyTime == nil || previous.FirstTruthyTime.IsZero()) && next != nil && next.Status == corev1.ConditionTrue && !(next.FirstTruthyTime == nil || next.FirstTruthyTime.IsZero()) { @@ -159,69 +165,178 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra timeToFirstReadiness.Observe(duration.Seconds()) } - // the integration pod may be in running phase, but the corresponding container running the integration code - // may be in error state, in this case we should check the deployment status and set the integration status accordingly. - if kubernetes.IsConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable) { - deployment, err := kubernetes.GetDeployment(ctx, action.client, integration.Name, integration.Namespace) - if err != nil { - return nil, err - } + return integration, nil +} - switch integration.Status.Phase { - case v1.IntegrationPhaseRunning: - deployUnavailable := false - progressingFailing := false - for _, c := range deployment.Status.Conditions { - // first, check if the container status is not available - if c.Type == appsv1.DeploymentAvailable { - deployUnavailable = c.Status == corev1.ConditionFalse - } - // second, check when it is progressing and reason is the replicas are available but the number of replicas are zero - // in this case, the container integration is failing - if c.Type == appsv1.DeploymentProgressing { - progressingFailing = c.Status == corev1.ConditionTrue && c.Reason == "NewReplicaSetAvailable" && deployment.Status.AvailableReplicas < 1 - } +func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { + var controller ctrl.Object + var lastCompletedJob *batchv1.Job + + if isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable) { + controller = &appsv1.Deployment{} + } else if isConditionTrue(integration, v1.IntegrationConditionKnativeServiceAvailable) { + controller = &servingv1.Service{} + } else if isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable) { + controller = &batchv1beta1.CronJob{} + } else { + return fmt.Errorf("unsupported controller for integration %s", integration.Name) + } + + switch c := controller.(type) { + case *appsv1.Deployment: + // Check the Deployment exists + if err := action.client.Get(ctx, ctrl.ObjectKeyFromObject(integration), c); err != nil { + if errors.IsNotFound(err) { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, err.Error()) + return nil + } else { + return err } - if deployUnavailable && progressingFailing { - notAvailableCondition := v1.IntegrationCondition{ - Type: v1.IntegrationConditionReady, - Status: corev1.ConditionFalse, - Reason: v1.IntegrationConditionErrorReason, - Message: "The corresponding pod(s) may be in error state, look at the pod status or log for errors", - } - integration.Status.SetConditions(notAvailableCondition) + } + // Check the Deployment progression + if progressing := kubernetes.GetDeploymentCondition(*c, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, progressing.Message) + return nil + } + + case *servingv1.Service: + // Check the KnativeService exists + if err := action.client.Get(ctx, ctrl.ObjectKeyFromObject(integration), c); err != nil { + if errors.IsNotFound(err) { integration.Status.Phase = v1.IntegrationPhaseError - return integration, nil + setReadyConditionError(integration, err.Error()) + return nil + } else { + return err } + } + // Check the KnativeService conditions + if ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, ready.Message) + return nil + } - case v1.IntegrationPhaseError: - // if the integration is in error phase, check if the corresponding pod is running ok, the user may have updated the integration. - deployAvailable := false - progressingOk := false - for _, c := range deployment.Status.Conditions { - // first, check if the container is in available state - if c.Type == appsv1.DeploymentAvailable { - deployAvailable = c.Status == corev1.ConditionTrue - } - // second, check the progressing and the reasons - if c.Type == appsv1.DeploymentProgressing { - progressingOk = c.Status == corev1.ConditionTrue && (c.Reason == "NewReplicaSetAvailable" || c.Reason == "ReplicaSetUpdated") + case *batchv1beta1.CronJob: + // Check the CronJob exists + if err := action.client.Get(ctx, ctrl.ObjectKeyFromObject(integration), c); err != nil { + if errors.IsNotFound(err) { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, err.Error()) + return nil + } else { + return err + } + } + // Check latest job result + if lastScheduleTime := c.Status.LastScheduleTime; lastScheduleTime != nil && len(c.Status.Active) == 0 { + jobs := batchv1.JobList{} + if err := action.client.List(ctx, &jobs, + ctrl.InNamespace(integration.Namespace), + ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, + ); err != nil { + return err + } + t := lastScheduleTime.Time + for i, job := range jobs.Items { + if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) { + continue } + lastCompletedJob = &jobs.Items[i] + t = lastCompletedJob.CreationTimestamp.Time } - if deployAvailable && progressingOk { - availableCondition := v1.IntegrationCondition{ - Type: v1.IntegrationConditionReady, - Status: corev1.ConditionTrue, - Reason: v1.IntegrationConditionReplicaSetReadyReason, + if lastCompletedJob != nil { + if failed := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue { + setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message)) + integration.Status.Phase = v1.IntegrationPhaseError + return nil } - integration.Status.SetConditions(availableCondition) - integration.Status.Phase = v1.IntegrationPhaseRunning - return integration, nil } } } - return integration, nil + // Check Pods statuses + for _, pod := range pendingPods { + // Check the scheduled condition + if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, scheduled.Message) + return nil + } + } + // Check pending container statuses + for _, pod := range pendingPods { + containers := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) + for _, container := range containers { + // Check the images are pulled + if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, waiting.Message) + return nil + } + } + } + // Check running container statuses + for _, pod := range runningPods { + containers := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) + for _, container := range containers { + // Check the container state + if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, waiting.Message) + return nil + } + if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" { + integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(integration, terminated.Message) + return nil + } + } + } + + integration.Status.Phase = v1.IntegrationPhaseRunning + + switch c := controller.(type) { + case *appsv1.Deployment: + replicas := int32(1) + if r := integration.Spec.Replicas; r != nil { + replicas = *r + } + if c.Status.UpdatedReplicas == replicas && c.Status.ReadyReplicas == replicas { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", c.Status.ReadyReplicas, replicas)) + } else if c.Status.UpdatedReplicas < replicas { + setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas)) + } else { + setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", c.Status.ReadyReplicas, replicas)) + } + + case *servingv1.Service: + ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady) + if ready.IsTrue() { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "") + } else { + setReadyCondition(integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage()) + } + + case *batchv1beta1.CronJob: + if c.Status.LastScheduleTime == nil { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") + } else if len(c.Status.Active) > 0 { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") + } else if c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0 { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") + } else if lastCompletedJob != nil { + if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { + setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name)) + } + } else { + integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "") + } + } + + return nil } func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) { @@ -245,3 +360,19 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, } return kit, nil } + +func isConditionTrue(integration *v1.Integration, conditionType v1.IntegrationConditionType) bool { + cond := integration.Status.GetCondition(conditionType) + if cond == nil { + return false + } + return cond.Status == corev1.ConditionTrue +} + +func setReadyConditionError(integration *v1.Integration, err string) { + setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionErrorReason, err) +} + +func setReadyCondition(integration *v1.Integration, status corev1.ConditionStatus, reason string, message string) { + integration.Status.SetCondition(v1.IntegrationConditionReady, status, reason, message) +} diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go index 9c1d7a6..d851ff8 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/resources.go @@ -371,9 +371,9 @@ var assets = func() http.FileSystem { "/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{ name: "operator-role.yaml", modTime: time.Time{}, - uncompressedSize: 2349, + uncompressedSize: 2311, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\x77\x93\xd6\xe8\xc2\x06\x22\x6f\x17\x7b\xa4\xa8\xb1\x3c\x0d\xc5\x61\x49\x2a\x8a\xfb\xf5\x05\x29\x7b\xd7\x59\x27\x40\x0e\x41\x5b\x5d\x3c\xa4\x46\x6f\xde\x9b\x79\x26\x4b\x4c\x5f\xef\x29\x4a\x7c\x60\x4d\x36\x50\x83\x28\x88\x7b\xc2\xd2\x29\xbd\x27\x54\xb2\x8b\x83\xf2\x84\x1b\xe9\x6d\xa3\x22\x8b\xc5\x9b\x65\x75\xf3\x16\xbd\x6d\xc8\x43\x2c\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x [...] }, "/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰FileInfo{ name: "patch-role-to-clusterrole.yaml", diff --git a/pkg/util/kubernetes/conditions.go b/pkg/util/kubernetes/conditions.go index 569bf43..f27ef61 100644 --- a/pkg/util/kubernetes/conditions.go +++ b/pkg/util/kubernetes/conditions.go @@ -18,113 +18,50 @@ limitations under the License. package kubernetes import ( - "context" - "errors" - "fmt" - "strconv" - - v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - "github.com/apache/camel-k/pkg/client" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + + knative "knative.dev/pkg/apis" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" ) -// nolint: gocritic -func MirrorReadyCondition(ctx context.Context, c client.Client, it *v1.Integration) { - if IsConditionTrue(it, v1.IntegrationConditionDeploymentAvailable) || IsConditionTrue(it, v1.IntegrationConditionKnativeServiceAvailable) { - mirrorReadyConditionFromReplicaSet(ctx, c, it) - } else if IsConditionTrue(it, v1.IntegrationConditionCronJobAvailable) { - mirrorReadyConditionFromCronJob(ctx, c, it) - } else { - it.Status.SetCondition( - v1.IntegrationConditionReady, - corev1.ConditionUnknown, - "", - "", - ) +func GetPodCondition(pod corev1.Pod, conditionType corev1.PodConditionType) *corev1.PodCondition { + for i := range pod.Status.Conditions { + condition := pod.Status.Conditions[i] + if condition.Type == conditionType { + return &condition + } } + return nil } -func mirrorReadyConditionFromReplicaSet(ctx context.Context, c client.Client, it *v1.Integration) { - list := appsv1.ReplicaSetList{} - opts := runtimeclient.MatchingLabels{ - v1.IntegrationLabel: it.Name, - } - if err := c.List(ctx, &list, opts, runtimeclient.InNamespace(it.Namespace)); err != nil { - setReadyConditionError(it, err) - return - } - - if len(list.Items) == 0 { - setReadyConditionError(it, errors.New("replicaset not found")) - return - } - - var rs *appsv1.ReplicaSet - for _, r := range list.Items { - r := r - if r.Labels["camel.apache.org/generation"] == strconv.FormatInt(it.Generation, 10) { - rs = &r +func GetDeploymentCondition(deployment appsv1.Deployment, conditionType appsv1.DeploymentConditionType) *appsv1.DeploymentCondition { + for i := range deployment.Status.Conditions { + condition := deployment.Status.Conditions[i] + if condition.Type == conditionType { + return &condition } } - if rs == nil { - rs = &list.Items[0] - } - var replicas int32 = 1 - if rs.Spec.Replicas != nil { - replicas = *rs.Spec.Replicas - } - // The Integration is considered ready when the number of replicas - // reported to be ready is larger or equal to the specified number - // of replicas. This avoid reporting a falsy readiness condition - // when the Integration is being down-scaled. - if replicas <= rs.Status.ReadyReplicas { - it.Status.SetCondition( - v1.IntegrationConditionReady, - corev1.ConditionTrue, - v1.IntegrationConditionReplicaSetReadyReason, - "", - ) - } else { - it.Status.SetCondition( - v1.IntegrationConditionReady, - corev1.ConditionFalse, - v1.IntegrationConditionReplicaSetNotReadyReason, - "", - ) - } + return nil } -func mirrorReadyConditionFromCronJob(ctx context.Context, c client.Client, it *v1.Integration) { - cronJob := v1beta1.CronJob{} - if err := c.Get(ctx, runtimeclient.ObjectKey{Namespace: it.Namespace, Name: it.Name}, &cronJob); err != nil { - setReadyConditionError(it, err) - } else { - // CronJob status is not tracked by Kubernetes - it.Status.SetCondition( - v1.IntegrationConditionReady, - corev1.ConditionTrue, - v1.IntegrationConditionCronJobCreatedReason, - "", - ) +func GetKnativeServiceCondition(service servingv1.Service, conditionType knative.ConditionType) *knative.Condition { + for i := range service.Status.Conditions { + condition := service.Status.Conditions[i] + if condition.Type == conditionType { + return &condition + } } + return nil } -func IsConditionTrue(it *v1.Integration, conditionType v1.IntegrationConditionType) bool { - cond := it.Status.GetCondition(conditionType) - if cond == nil { - return false +func GetJobCondition(job batchv1.Job, conditionType batchv1.JobConditionType) *batchv1.JobCondition { + for i := range job.Status.Conditions { + condition := job.Status.Conditions[i] + if condition.Type == conditionType { + return &condition + } } - return cond.Status == corev1.ConditionTrue -} - -func setReadyConditionError(it *v1.Integration, err error) { - it.Status.SetCondition( - v1.IntegrationConditionReady, - corev1.ConditionUnknown, - v1.IntegrationConditionErrorReason, - fmt.Sprintf("%v", err), - ) + return nil }