This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dfb827a38bc [FLINK-35060][Test/Connector] Provide compatibility of old CheckpointMode for connector testing framework dfb827a38bc is described below commit dfb827a38bc81fe4610cd0c88c66b8d5da1c0147 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Thu Apr 11 15:30:07 2024 +0800 [FLINK-35060][Test/Connector] Provide compatibility of old CheckpointMode for connector testing framework --- .../flink/streaming/api/CheckpointingMode.java | 12 +++++++++++ .../external/sink/TestingSinkSettings.java | 25 +++++++++++++++++++--- .../external/source/TestingSourceSettings.java | 25 +++++++++++++++++++--- .../testframe/junit/annotations/TestSemantics.java | 3 ++- .../extensions/ConnectorTestingExtension.java | 24 ++++++++++++++++++++- .../utils/UnorderedCollectIteratorAssert.java | 13 +++++++++++ 6 files changed, 94 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java index 17e20dbe037..8c311b6bc71 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java @@ -90,4 +90,16 @@ public enum CheckpointingMode { throw new IllegalArgumentException("Unsupported semantic: " + semantic); } } + + public static org.apache.flink.streaming.api.CheckpointingMode convertFromCheckpointingMode( + org.apache.flink.core.execution.CheckpointingMode semantic) { + switch (semantic) { + case EXACTLY_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + case AT_LEAST_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; + default: + throw new IllegalArgumentException("Unsupported semantic: " + semantic); + } + } } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java index 2494b814aa2..d392576e252 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java @@ -20,6 +20,8 @@ package org.apache.flink.connector.testframe.external.sink; import org.apache.flink.core.execution.CheckpointingMode; +import static org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode; +import static org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode; import static org.apache.flink.util.Preconditions.checkNotNull; /** Settings for configuring the sink under testing. */ @@ -34,9 +36,14 @@ public class TestingSinkSettings { this.checkpointingMode = checkpointingMode; } - /** Checkpointing mode required for the sink. */ - public CheckpointingMode getCheckpointingMode() { - return checkpointingMode; + /** + * Checkpointing mode required for the sink. This method is required for downstream projects + * e.g. Flink connectors extending this test for the case when there should be supported Flink + * versions below 1.20. Could be removed together with dropping support for Flink 1.19. + */ + @Deprecated + public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { + return convertFromCheckpointingMode(checkpointingMode); } /** Builder class for {@link TestingSinkSettings}. */ @@ -48,6 +55,18 @@ public class TestingSinkSettings { return this; } + /** + * This method is required for downstream projects e.g. Flink connectors extending this test + * for the case when there should be supported Flink versions below 1.20. Could be removed + * together with dropping support for Flink 1.19. + */ + @Deprecated + public Builder setCheckpointingMode( + org.apache.flink.streaming.api.CheckpointingMode checkpointingMode) { + this.checkpointingMode = convertToCheckpointingMode(checkpointingMode); + return this; + } + public TestingSinkSettings build() { sanityCheck(); return new TestingSinkSettings(checkpointingMode); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java index 2d1626bc619..433ab27f8ad 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java @@ -21,6 +21,8 @@ package org.apache.flink.connector.testframe.external.source; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.core.execution.CheckpointingMode; +import static org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode; +import static org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode; import static org.apache.flink.util.Preconditions.checkNotNull; /** Settings for configuring the source under testing. */ @@ -37,9 +39,14 @@ public class TestingSourceSettings { return boundedness; } - /** Checkpointing mode required for the source. */ - public CheckpointingMode getCheckpointingMode() { - return checkpointingMode; + /** + * Checkpointing mode required for the source. This method is required for downstream projects + * e.g. Flink connectors extending this test for the case when there should be supported Flink + * versions below 1.20. Could be removed together with dropping support for Flink 1.19. + */ + @Deprecated + public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { + return convertFromCheckpointingMode(checkpointingMode); } private TestingSourceSettings(Boundedness boundedness, CheckpointingMode checkpointingMode) { @@ -62,6 +69,18 @@ public class TestingSourceSettings { return this; } + /** + * This method is required for downstream projects e.g. Flink connectors extending this test + * for the case when there should be supported Flink versions below 1.20. Could be removed + * together with dropping support for Flink 1.19. + */ + @Deprecated + public Builder setCheckpointingMode( + org.apache.flink.streaming.api.CheckpointingMode checkpointingMode) { + this.checkpointingMode = convertToCheckpointingMode(checkpointingMode); + return this; + } + public TestingSourceSettings build() { sanityCheck(); return new TestingSourceSettings(boundedness, checkpointingMode); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java index 291db078403..ece506ac199 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java @@ -27,7 +27,8 @@ import java.lang.annotation.Target; /** * Marks the field in test class defining supported semantic: {@link - * org.apache.flink.core.execution.CheckpointingMode}. + * org.apache.flink.core.execution.CheckpointingMode} and {@link + * org.apache.flink.streaming.api.CheckpointingMode} (deprecated). * * <p>Only one field can be annotated in test class. */ diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java index 7e1e1a8c06c..ebe9f077d3a 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java @@ -36,6 +36,7 @@ import org.junit.platform.commons.support.AnnotationSupport; import java.lang.annotation.Annotation; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -97,11 +98,32 @@ public class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCal } // Store supported semantic - final List<CheckpointingMode[]> semantics = + List<CheckpointingMode[]> semantics = AnnotationSupport.findAnnotatedFieldValues( context.getRequiredTestInstance(), TestSemantics.class, CheckpointingMode[].class); + // Fallback part start. + // This is for compatibility of org.apache.flink.streaming.api.CheckpointingMode, which can + // be removed if we drop the support of 1.19 and old CheckpointingMode. + final List<org.apache.flink.streaming.api.CheckpointingMode[]> fallbackSemantics = + AnnotationSupport.findAnnotatedFieldValues( + context.getRequiredTestInstance(), + TestSemantics.class, + org.apache.flink.streaming.api.CheckpointingMode[].class); + if (!fallbackSemantics.isEmpty()) { + semantics = new ArrayList<>(semantics); + } + for (org.apache.flink.streaming.api.CheckpointingMode[] oldModes : fallbackSemantics) { + semantics.add( + Arrays.stream(oldModes) + .sequential() + .map( + org.apache.flink.streaming.api.CheckpointingMode + ::convertToCheckpointingMode) + .toArray(CheckpointingMode[]::new)); + } + // Fallback part ends. checkExactlyOneAnnotatedField(semantics, TestSemantics.class); context.getStore(TEST_RESOURCE_NAMESPACE) .put(SUPPORTED_SEMANTIC_STORE_KEY, semantics.get(0)); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java index 8fb4a3804a8..3d4945b3719 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java @@ -29,6 +29,7 @@ import java.util.Set; import static java.util.stream.Collectors.toSet; import static org.apache.flink.shaded.guava31.com.google.common.base.Predicates.not; +import static org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -58,6 +59,18 @@ public class UnorderedCollectIteratorAssert<T> return this; } + /** + * This method is required for downstream projects e.g. Flink connectors extending this test for + * the case when there should be supported Flink versions below 1.20. Could be removed together + * with dropping support for Flink 1.19. + */ + @Deprecated + public void matchesRecordsFromSource( + List<List<T>> recordsBySplitsFromSource, + org.apache.flink.streaming.api.CheckpointingMode semantic) { + matchesRecordsFromSource(recordsBySplitsFromSource, convertToCheckpointingMode(semantic)); + } + public void matchesRecordsFromSource( List<List<T>> recordsBySplitsFromSource, CheckpointingMode semantic) { for (List<T> list : recordsBySplitsFromSource) {