This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f4c0c99c [FLINK-39808] Warn on use of deprecated Flink versions (#1124)
f4c0c99c is described below

commit f4c0c99c6ebc183e5e2bab56e87ac31a450b168a
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 12:21:36 2026 +0300

    [FLINK-39808] Warn on use of deprecated Flink versions (#1124)
---
 docs/content.zh/docs/custom-resource/reference.md  |  8 ++---
 docs/content/docs/custom-resource/reference.md     |  4 +--
 .../kubernetes/operator/api/spec/FlinkVersion.java | 25 ++++++++++++++
 .../operator/api/spec/FlinkVersionTest.java        | 38 ++++++++++++++++++++++
 .../operator/api/utils/BaseTestUtils.java          |  8 ++---
 .../kubernetes/operator/utils/ValidatorUtils.java  |  7 +++-
 .../flink/kubernetes/operator/TestUtils.java       |  6 ++--
 .../operator/validation/DefaultValidatorTest.java  |  4 ++-
 8 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/docs/content.zh/docs/custom-resource/reference.md 
b/docs/content.zh/docs/custom-resource/reference.md
index 126de5a4..a8219c97 100644
--- a/docs/content.zh/docs/custom-resource/reference.md
+++ b/docs/content.zh/docs/custom-resource/reference.md
@@ -80,10 +80,10 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----- | ---- |
 | v1_13 | No longer supported since 1.7 operator release. |
 | v1_14 | No longer supported since 1.7 operator release. |
-| v1_15 |  |
-| v1_16 |  |
-| v1_17 |  |
-| v1_18 |  |
+| v1_15 | Deprecated since 1.10 operator release. |
+| v1_16 | Deprecated since 1.11 operator release. |
+| v1_17 | Deprecated since 1.13 operator release. |
+| v1_18 | Deprecated since 1.13 operator release. |
 | v1_19 |  |
 | v1_20 |  |
 
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 76b303ce..df54244f 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -158,8 +158,8 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | v1_14 | No longer supported since 1.7 operator release. |
 | v1_15 | Deprecated since 1.10 operator release. |
 | v1_16 | Deprecated since 1.11 operator release. |
-| v1_17 |  |
-| v1_18 |  |
+| v1_17 | Deprecated since 1.13 operator release. |
+| v1_18 | Deprecated since 1.13 operator release. |
 | v1_19 |  |
 | v1_20 |  |
 | v2_0 |  |
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index 8c1705fa..2d69d7d3 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -20,6 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
 
 import org.apache.flink.annotation.Experimental;
 
+import java.util.EnumSet;
+import java.util.Set;
+
 /** Enumeration for supported Flink versions. */
 @Experimental
 public enum FlinkVersion {
@@ -35,8 +38,10 @@ public enum FlinkVersion {
     /** Deprecated since 1.11 operator release. */
     @Deprecated
     v1_16(1, 16),
+    /** Deprecated since 1.13 operator release. */
     @Deprecated
     v1_17(1, 17),
+    /** Deprecated since 1.13 operator release. */
     @Deprecated
     v1_18(1, 18),
     v1_19(1, 19),
@@ -47,6 +52,8 @@ public enum FlinkVersion {
     v2_3(2, 3),
     v2_4(2, 4);
 
+    private static final Set<FlinkVersion> DEPRECATED = computeDeprecated();
+
     /** The major integer from the Flink semver. For example for Flink 1.18.1 
this would be 1. */
     private final int majorVersion;
 
@@ -68,6 +75,24 @@ public enum FlinkVersion {
         return false;
     }
 
+    private static Set<FlinkVersion> computeDeprecated() {
+        Set<FlinkVersion> deprecated = EnumSet.noneOf(FlinkVersion.class);
+        for (FlinkVersion v : values()) {
+            try {
+                if 
(FlinkVersion.class.getField(v.name()).isAnnotationPresent(Deprecated.class)) {
+                    deprecated.add(v);
+                }
+            } catch (NoSuchFieldException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+        return deprecated;
+    }
+
+    public boolean isDeprecated() {
+        return DEPRECATED.contains(this);
+    }
+
     public static boolean isSupported(FlinkVersion version) {
         return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
     }
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
index 2cbe49b0..3859a359 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
@@ -35,7 +35,45 @@ class FlinkVersionTest {
 
     @Test
     void isSupported() {
+        assertFalse(FlinkVersion.isSupported(null));
+        assertFalse(FlinkVersion.isSupported(FlinkVersion.v1_13));
+        assertFalse(FlinkVersion.isSupported(FlinkVersion.v1_14));
+        assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_15));
+        assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_18));
+        assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_19));
         assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
+        assertTrue(FlinkVersion.isSupported(FlinkVersion.v2_0));
+        assertTrue(FlinkVersion.isSupported(FlinkVersion.v2_4));
+    }
+
+    @Test
+    void isDeprecated() {
+        assertTrue(FlinkVersion.v1_13.isDeprecated());
+        assertTrue(FlinkVersion.v1_14.isDeprecated());
+        assertTrue(FlinkVersion.v1_15.isDeprecated());
+        assertTrue(FlinkVersion.v1_16.isDeprecated());
+        assertTrue(FlinkVersion.v1_17.isDeprecated());
+        assertTrue(FlinkVersion.v1_18.isDeprecated());
+        assertFalse(FlinkVersion.v1_19.isDeprecated());
+        assertFalse(FlinkVersion.v1_20.isDeprecated());
+        assertFalse(FlinkVersion.v2_0.isDeprecated());
+        assertFalse(FlinkVersion.v2_4.isDeprecated());
+    }
+
+    @Test
+    void isDeprecatedMatchesDeprecatedAnnotation() throws Exception {
+        for (FlinkVersion v : FlinkVersion.values()) {
+            boolean annotated =
+                    
FlinkVersion.class.getField(v.name()).isAnnotationPresent(Deprecated.class);
+            assertEquals(
+                    annotated,
+                    v.isDeprecated(),
+                    v
+                            + ": @Deprecated says "
+                            + annotated
+                            + " but isDeprecated() says "
+                            + v.isDeprecated());
+        }
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index cd30143a..7d4d6f4c 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -70,7 +70,7 @@ public class BaseTestUtils {
     public static final String SAMPLE_SESSION_JOB_JAR = 
"https://example.com/sample.jar";;
 
     public static FlinkDeployment buildSessionCluster() {
-        return buildSessionCluster(FlinkVersion.v1_17);
+        return buildSessionCluster(FlinkVersion.v1_20);
     }
 
     public static FlinkDeployment buildSessionCluster(FlinkVersion version) {
@@ -94,15 +94,15 @@ public class BaseTestUtils {
     }
 
     public static FlinkDeployment buildApplicationCluster(JobState state) {
-        return buildApplicationCluster(FlinkVersion.v1_17, state);
+        return buildApplicationCluster(FlinkVersion.v1_20, state);
     }
 
     public static FlinkDeployment buildApplicationCluster() {
-        return buildApplicationCluster(FlinkVersion.v1_17, JobState.RUNNING);
+        return buildApplicationCluster(FlinkVersion.v1_20, JobState.RUNNING);
     }
 
     public static FlinkDeployment buildApplicationCluster(String name, String 
namespace) {
-        return buildApplicationCluster(name, namespace, FlinkVersion.v1_17, 
JobState.RUNNING);
+        return buildApplicationCluster(name, namespace, FlinkVersion.v1_20, 
JobState.RUNNING);
     }
 
     public static FlinkDeployment buildApplicationCluster(FlinkVersion 
version) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
index 4aca7bfa..0f21c81a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
@@ -34,7 +34,7 @@ import java.util.Set;
 /** Validator utilities. */
 public final class ValidatorUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValidatorUtils.class);
 
     public static Set<FlinkResourceValidator> 
discoverValidators(FlinkConfigManager configManager) {
         var conf = configManager.getDefaultConfig();
@@ -72,6 +72,11 @@ public final class ValidatorUtils {
                     ctx.getJosdkContext().getClient());
             return false;
         }
+        if (version.isDeprecated()) {
+            LOG.warn(
+                    "Flink version {} is deprecated and may be removed in a 
future operator release. Plan to upgrade to a non-deprecated version.",
+                    version);
+        }
         return true;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 83f8f2cc..db5820b0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -215,7 +215,7 @@ public class TestUtils extends BaseTestUtils {
 
     public static <T extends HasMetadata> Context<T> 
createContextWithReadyFlinkDeployment(
             Map<String, String> flinkDepConfig, KubernetesClient client) {
-        return createContextWithReadyFlinkDeployment(flinkDepConfig, client, 
FlinkVersion.v1_18);
+        return createContextWithReadyFlinkDeployment(flinkDepConfig, client, 
FlinkVersion.v1_20);
     }
 
     public static <T extends HasMetadata> Context<T> 
createContextWithReadyFlinkDeployment(
@@ -431,7 +431,7 @@ public class TestUtils extends BaseTestUtils {
 
     public static Stream<Arguments> flinkVersionsAndUpgradeModes() {
         List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : Set.of(FlinkVersion.v1_16, 
FlinkVersion.v1_20)) {
+        for (FlinkVersion version : Set.of(FlinkVersion.v1_19, 
FlinkVersion.v1_20)) {
             for (UpgradeMode upgradeMode : UpgradeMode.values()) {
                 args.add(arguments(version, upgradeMode));
             }
@@ -440,7 +440,7 @@ public class TestUtils extends BaseTestUtils {
     }
 
     public static Stream<Arguments> flinkVersions() {
-        return Stream.of(arguments(FlinkVersion.v1_16), 
arguments(FlinkVersion.v1_20));
+        return Stream.of(arguments(FlinkVersion.v1_19), 
arguments(FlinkVersion.v1_20));
     }
 
     public static FlinkDeployment createCanaryDeployment() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index bba875a5..25082625 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -465,6 +465,8 @@ public class DefaultValidatorTest {
                         + " is not supported by this operator version");
 
         testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
+        testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_18));
+        testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_19));
 
         testError(
                 dep -> dep.getSpec().setServiceAccount(null),
@@ -752,7 +754,7 @@ public class DefaultValidatorTest {
             // Stopped with LAST_STATE mode with different Flink Version
             suspendSpec.getJob().setUpgradeMode(fromUpgrade);
             suspendSpec.getJob().setState(fromState);
-            suspendSpec.setFlinkVersion(FlinkVersion.v1_18);
+            suspendSpec.setFlinkVersion(FlinkVersion.v1_19);
 
             dep.getStatus()
                     .getReconciliationStatus()

Reply via email to