This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dfb827a38bc [FLINK-35060][Test/Connector] Provide compatibility of old 
CheckpointMode for connector testing framework
dfb827a38bc is described below

commit dfb827a38bc81fe4610cd0c88c66b8d5da1c0147
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Thu Apr 11 15:30:07 2024 +0800

    [FLINK-35060][Test/Connector] Provide compatibility of old CheckpointMode 
for connector testing framework
---
 .../flink/streaming/api/CheckpointingMode.java     | 12 +++++++++++
 .../external/sink/TestingSinkSettings.java         | 25 +++++++++++++++++++---
 .../external/source/TestingSourceSettings.java     | 25 +++++++++++++++++++---
 .../testframe/junit/annotations/TestSemantics.java |  3 ++-
 .../extensions/ConnectorTestingExtension.java      | 24 ++++++++++++++++++++-
 .../utils/UnorderedCollectIteratorAssert.java      | 13 +++++++++++
 6 files changed, 94 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
index 17e20dbe037..8c311b6bc71 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -90,4 +90,16 @@ public enum CheckpointingMode {
                 throw new IllegalArgumentException("Unsupported semantic: " + 
semantic);
         }
     }
+
+    public static org.apache.flink.streaming.api.CheckpointingMode 
convertFromCheckpointingMode(
+            org.apache.flink.core.execution.CheckpointingMode semantic) {
+        switch (semantic) {
+            case EXACTLY_ONCE:
+                return 
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+            case AT_LEAST_ONCE:
+                return 
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+            default:
+                throw new IllegalArgumentException("Unsupported semantic: " + 
semantic);
+        }
+    }
 }
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
index 2494b814aa2..d392576e252 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.testframe.external.sink;
 
 import org.apache.flink.core.execution.CheckpointingMode;
 
+import static 
org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode;
+import static 
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Settings for configuring the sink under testing. */
@@ -34,9 +36,14 @@ public class TestingSinkSettings {
         this.checkpointingMode = checkpointingMode;
     }
 
-    /** Checkpointing mode required for the sink. */
-    public CheckpointingMode getCheckpointingMode() {
-        return checkpointingMode;
+    /**
+     * Checkpointing mode required for the sink. This method is required for 
downstream projects
+     * e.g. Flink connectors extending this test for the case when there 
should be supported Flink
+     * versions below 1.20. Could be removed together with dropping support 
for Flink 1.19.
+     */
+    @Deprecated
+    public org.apache.flink.streaming.api.CheckpointingMode 
getCheckpointingMode() {
+        return convertFromCheckpointingMode(checkpointingMode);
     }
 
     /** Builder class for {@link TestingSinkSettings}. */
@@ -48,6 +55,18 @@ public class TestingSinkSettings {
             return this;
         }
 
+        /**
+         * This method is required for downstream projects e.g. Flink 
connectors extending this test
+         * for the case when there should be supported Flink versions below 
1.20. Could be removed
+         * together with dropping support for Flink 1.19.
+         */
+        @Deprecated
+        public Builder setCheckpointingMode(
+                org.apache.flink.streaming.api.CheckpointingMode 
checkpointingMode) {
+            this.checkpointingMode = 
convertToCheckpointingMode(checkpointingMode);
+            return this;
+        }
+
         public TestingSinkSettings build() {
             sanityCheck();
             return new TestingSinkSettings(checkpointingMode);
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
index 2d1626bc619..433ab27f8ad 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.testframe.external.source;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.core.execution.CheckpointingMode;
 
+import static 
org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode;
+import static 
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Settings for configuring the source under testing. */
@@ -37,9 +39,14 @@ public class TestingSourceSettings {
         return boundedness;
     }
 
-    /** Checkpointing mode required for the source. */
-    public CheckpointingMode getCheckpointingMode() {
-        return checkpointingMode;
+    /**
+     * Checkpointing mode required for the source. This method is required for 
downstream projects
+     * e.g. Flink connectors extending this test for the case when there 
should be supported Flink
+     * versions below 1.20. Could be removed together with dropping support 
for Flink 1.19.
+     */
+    @Deprecated
+    public org.apache.flink.streaming.api.CheckpointingMode 
getCheckpointingMode() {
+        return convertFromCheckpointingMode(checkpointingMode);
     }
 
     private TestingSourceSettings(Boundedness boundedness, CheckpointingMode 
checkpointingMode) {
@@ -62,6 +69,18 @@ public class TestingSourceSettings {
             return this;
         }
 
+        /**
+         * This method is required for downstream projects e.g. Flink 
connectors extending this test
+         * for the case when there should be supported Flink versions below 
1.20. Could be removed
+         * together with dropping support for Flink 1.19.
+         */
+        @Deprecated
+        public Builder setCheckpointingMode(
+                org.apache.flink.streaming.api.CheckpointingMode 
checkpointingMode) {
+            this.checkpointingMode = 
convertToCheckpointingMode(checkpointingMode);
+            return this;
+        }
+
         public TestingSourceSettings build() {
             sanityCheck();
             return new TestingSourceSettings(boundedness, checkpointingMode);
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
index 291db078403..ece506ac199 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
@@ -27,7 +27,8 @@ import java.lang.annotation.Target;
 
 /**
  * Marks the field in test class defining supported semantic: {@link
- * org.apache.flink.core.execution.CheckpointingMode}.
+ * org.apache.flink.core.execution.CheckpointingMode} and {@link
+ * org.apache.flink.streaming.api.CheckpointingMode} (deprecated).
  *
  * <p>Only one field can be annotated in test class.
  */
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
index 7e1e1a8c06c..ebe9f077d3a 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
@@ -36,6 +36,7 @@ import org.junit.platform.commons.support.AnnotationSupport;
 
 import java.lang.annotation.Annotation;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -97,11 +98,32 @@ public class ConnectorTestingExtension implements 
BeforeAllCallback, AfterAllCal
         }
 
         // Store supported semantic
-        final List<CheckpointingMode[]> semantics =
+        List<CheckpointingMode[]> semantics =
                 AnnotationSupport.findAnnotatedFieldValues(
                         context.getRequiredTestInstance(),
                         TestSemantics.class,
                         CheckpointingMode[].class);
+        // Fallback part start.
+        // This is for compatibility of 
org.apache.flink.streaming.api.CheckpointingMode, which can
+        // be removed if we drop the support of 1.19 and old CheckpointingMode.
+        final List<org.apache.flink.streaming.api.CheckpointingMode[]> 
fallbackSemantics =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(),
+                        TestSemantics.class,
+                        
org.apache.flink.streaming.api.CheckpointingMode[].class);
+        if (!fallbackSemantics.isEmpty()) {
+            semantics = new ArrayList<>(semantics);
+        }
+        for (org.apache.flink.streaming.api.CheckpointingMode[] oldModes : 
fallbackSemantics) {
+            semantics.add(
+                    Arrays.stream(oldModes)
+                            .sequential()
+                            .map(
+                                    
org.apache.flink.streaming.api.CheckpointingMode
+                                            ::convertToCheckpointingMode)
+                            .toArray(CheckpointingMode[]::new));
+        }
+        // Fallback part ends.
         checkExactlyOneAnnotatedField(semantics, TestSemantics.class);
         context.getStore(TEST_RESOURCE_NAMESPACE)
                 .put(SUPPORTED_SEMANTIC_STORE_KEY, semantics.get(0));
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
index 8fb4a3804a8..3d4945b3719 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
@@ -29,6 +29,7 @@ import java.util.Set;
 
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.flink.shaded.guava31.com.google.common.base.Predicates.not;
+import static 
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -58,6 +59,18 @@ public class UnorderedCollectIteratorAssert<T>
         return this;
     }
 
+    /**
+     * This method is required for downstream projects e.g. Flink connectors 
extending this test for
+     * the case when there should be supported Flink versions below 1.20. 
Could be removed together
+     * with dropping support for Flink 1.19.
+     */
+    @Deprecated
+    public void matchesRecordsFromSource(
+            List<List<T>> recordsBySplitsFromSource,
+            org.apache.flink.streaming.api.CheckpointingMode semantic) {
+        matchesRecordsFromSource(recordsBySplitsFromSource, 
convertToCheckpointingMode(semantic));
+    }
+
     public void matchesRecordsFromSource(
             List<List<T>> recordsBySplitsFromSource, CheckpointingMode 
semantic) {
         for (List<T> list : recordsBySplitsFromSource) {

Reply via email to