This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c44a8816 [hotfix] Reuse FlinkConfigManager#isSnapshotCrdInstalled in
controllers
c44a8816 is described below
commit c44a8816110e87cb6b3ca1e1035e6dd4f755d363
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon May 25 17:15:37 2026 +0300
[hotfix] Reuse FlinkConfigManager#isSnapshotCrdInstalled in controllers
---
.../org/apache/flink/kubernetes/operator/FlinkOperator.java | 3 ++-
.../flink/kubernetes/operator/config/FlinkConfigManager.java | 4 ++++
.../operator/controller/FlinkDeploymentController.java | 10 ++++------
.../operator/controller/FlinkSessionJobController.java | 10 ++++++----
.../operator/controller/TestingFlinkSessionJobController.java | 3 ++-
5 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 6bdcb8f0..f2bb9286 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -225,7 +225,8 @@ public class FlinkOperator {
observer,
statusRecorder,
eventRecorder,
- canaryResourceManager);
+ canaryResourceManager,
+ configManager);
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 62fe3239..9b9126bc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -145,6 +145,10 @@ public class FlinkConfigManager {
}
}
+ public boolean isSnapshotCrdInstalled() {
+ return snapshotCrdInstalled;
+ }
+
/**
* Update the base configuration for the operator. Newly generated configs
(observe, deploy,
* etc.) will use this as the base.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 69909c5c..ac832a9e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -35,7 +34,6 @@ import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
-import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -92,7 +90,7 @@ public class FlinkDeploymentController
}
@Override
- public DeleteControl cleanup(FlinkDeployment flinkApp, Context
josdkContext) {
+ public DeleteControl cleanup(FlinkDeployment flinkApp,
Context<FlinkDeployment> josdkContext) {
if (canaryResourceManager.handleCanaryResourceDeletion(flinkApp)) {
return DeleteControl.defaultDelete();
}
@@ -124,8 +122,8 @@ public class FlinkDeploymentController
}
@Override
- public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp,
Context josdkContext)
- throws Exception {
+ public UpdateControl<FlinkDeployment> reconcile(
+ FlinkDeployment flinkApp, Context<FlinkDeployment> josdkContext)
throws Exception {
if (canaryResourceManager.handleCanaryResourceReconciliation(
flinkApp, josdkContext.getClient())) {
@@ -192,7 +190,7 @@ public class FlinkDeploymentController
if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
}
- if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
+ if (flinkConfigManager.isSnapshotCrdInstalled()) {
eventSources.add(
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
} else {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index a7f2106b..1ffa20e8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -19,9 +19,9 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
-import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
@@ -31,7 +31,6 @@ import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
-import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -67,6 +66,7 @@ public class FlinkSessionJobController
private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder;
private final EventRecorder eventRecorder;
private final CanaryResourceManager<FlinkSessionJob> canaryResourceManager;
+ private final FlinkConfigManager flinkConfigManager;
public FlinkSessionJobController(
Set<FlinkResourceValidator> validators,
@@ -75,7 +75,8 @@ public class FlinkSessionJobController
Observer<FlinkSessionJob> observer,
StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder,
EventRecorder eventRecorder,
- CanaryResourceManager<FlinkSessionJob> canaryResourceManager) {
+ CanaryResourceManager<FlinkSessionJob> canaryResourceManager,
+ FlinkConfigManager flinkConfigManager) {
this.validators = validators;
this.ctxFactory = ctxFactory;
this.reconciler = reconciler;
@@ -83,6 +84,7 @@ public class FlinkSessionJobController
this.statusRecorder = statusRecorder;
this.eventRecorder = eventRecorder;
this.canaryResourceManager = canaryResourceManager;
+ this.flinkConfigManager = flinkConfigManager;
}
@Override
@@ -179,7 +181,7 @@ public class FlinkSessionJobController
List<EventSource<?, FlinkSessionJob>> eventSources = new ArrayList<>();
eventSources.add(EventSourceUtils.getFlinkDeploymentInformerEventSource(context));
- if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
+ if (flinkConfigManager.isSnapshotCrdInstalled()) {
eventSources.add(
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
} else {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index 7ba5cd48..b135fba0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -97,7 +97,8 @@ public class TestingFlinkSessionJobController
new FlinkSessionJobObserver(eventRecorder),
statusRecorder,
eventRecorder,
- canaryResourceManager);
+ canaryResourceManager,
+ configManager);
}
@Override