This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9250ba4f42f67968adcb72063dd33590d87f7416 Author: Yufan Sheng <[email protected]> AuthorDate: Tue Sep 6 00:02:04 2022 +0800 [FLINK-28934][Connector/pulsar] Support connector testing tools for Pulsar unordered source. --- .../f4d91193-72ba-4ce4-ad83-98f780dce581 | 6 + .../fetcher/PulsarUnorderedFetcherManager.java | 8 +- .../reader/source/PulsarUnorderedSourceReader.java | 5 +- .../split/PulsarUnorderedPartitionSplitReader.java | 2 +- .../pulsar/source/PulsarSourceITCase.java | 8 +- .../pulsar/source/PulsarUnorderedSourceITCase.java | 103 +++++++++++++++ .../cases/SharedSubscriptionConsumingContext.java | 58 +++++++++ .../testutils/runtime/PulsarRuntimeOperator.java | 14 +-- .../runtime/container/PulsarContainerRuntime.java | 1 - .../runtime/embedded/PulsarEmbeddedRuntime.java | 1 - .../testutils/runtime/mock/PulsarMockRuntime.java | 5 +- .../testframe/testsuites/SourceTestSuiteBase.java | 43 +++---- .../testframe/utils/CollectIteratorAssertions.java | 11 +- .../utils/UnorderedCollectIteratorAssert.java | 140 +++++++++++++++++++++ 14 files changed, 355 insertions(+), 50 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 index 452f99f423a..03d0ae583af 100644 --- a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 +++ b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 @@ -2,5 +2,11 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java index d2662f06b0a..1523b9a0fca 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java @@ -53,19 +53,19 @@ public class PulsarUnorderedFetcherManager<T> extends PulsarFetcherManagerBase<T super(elementsQueue, splitReaderSupplier); } - public List<PulsarPartitionSplit> snapshotState(long checkpointId) { + public List<PulsarPartitionSplit> snapshotState() { return fetchers.values().stream() .map(SplitFetcher::getSplitReader) - .map(splitReader -> snapshotReader(checkpointId, splitReader)) + .map(this::snapshotReader) .filter(Optional::isPresent) .map(Optional::get) .collect(toCollection(() -> new ArrayList<>(fetchers.size()))); } private Optional<PulsarPartitionSplit> snapshotReader( - long checkpointId, SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { + SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { return ((PulsarUnorderedPartitionSplitReader<T>) splitReader) - .snapshotState(checkpointId) + .snapshotState() .map(PulsarPartitionSplitState::toPulsarPartitionSplit); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java index b20b117652d..0d52b5e4bb4 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java @@ -138,11 +138,10 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT public List<PulsarPartitionSplit> snapshotState(long checkpointId) { LOG.debug("Trigger the new transaction for downstream readers."); List<PulsarPartitionSplit> splits = - ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager) - .snapshotState(checkpointId); + ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager).snapshotState(); if (coordinatorClient != null) { - // Snapshot the transaction status and commit it after checkpoint finished. + // Snapshot the transaction status and commit it after checkpoint finishing. List<TxnID> txnIDs = transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>()); for (PulsarPartitionSplit split : splits) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index 73223acc9af..b5809ef9b7b 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -136,7 +136,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl } } - public Optional<PulsarPartitionSplitState> snapshotState(long checkpointId) { + public Optional<PulsarPartitionSplitState> snapshotState() { if (registeredSplit == null) { return Optional.empty(); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java index fa46f9a4cdc..54fc4d71dd5 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java @@ -32,9 +32,13 @@ import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.testutils.junit.FailsOnJava11; +import org.apache.pulsar.client.api.SubscriptionType; import org.junit.experimental.categories.Category; -/** Unite test class for {@link PulsarSource}. */ +/** + * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Exclusive} + * subscription. + */ @Category(value = {FailsOnJava11.class}) class PulsarSourceITCase extends SourceTestSuiteBase<String> { // Defines test environment on Flink MiniCluster @@ -48,7 +52,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> { CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; // Defines an external context Factories, - // so test cases will be invoked using this external contexts. + // so test cases will be invoked using these external contexts. @TestContext PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic = new PulsarTestContextFactory<>(pulsar, SingleTopicConsumingContext::new); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java new file mode 100644 index 00000000000..6bad7220886 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.testutils.cases.SharedSubscriptionConsumingContext; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.testutils.junit.FailsOnJava11; +import org.apache.flink.util.CloseableIterator; + +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Disabled; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription. + */ +@Category(value = {FailsOnJava11.class}) +public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> { + // Defines test environment on Flink MiniCluster + @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + + // Defines pulsar running environment + @TestExternalSystem + PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock()); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + PulsarTestContextFactory<String, SharedSubscriptionConsumingContext> singleTopic = + new PulsarTestContextFactory<>(pulsar, SharedSubscriptionConsumingContext::new); + + @Override + protected void checkResultWithSemantic( + CloseableIterator<String> resultIterator, + List<List<String>> testData, + CheckpointingMode semantic, + Integer limit) { + Runnable runnable = + () -> + assertUnordered(resultIterator) + .withNumRecordsLimit(getExpectedSize(testData, limit)) + .matchesRecordsFromSource(testData, semantic); + + assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + } + + /** + * Shared subscription will have multiple readers on same partition, this would make hard to + * automatically stop like a bounded source. + */ + private static int getExpectedSize(List<List<String>> testData, Integer limit) { + if (limit == null) { + return testData.stream().mapToInt(List::size).sum(); + } else { + return limit; + } + } + + @Override + @Disabled("We don't have any idle readers in Pulsar's shared subscription.") + public void testIdleReader( + TestEnvironment testEnv, + DataStreamSourceExternalContext<String> externalContext, + CheckpointingMode semantic) + throws Exception { + super.testIdleReader(testEnv, externalContext, semantic); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java new file mode 100644 index 00000000000..8001b6e7300 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils.cases; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.net.URL; +import java.util.Collections; +import java.util.List; + +/** + * A consuming context with {@link SubscriptionType#Shared}, it's almost the same as {@link + * MultipleTopicConsumingContext}. + */ +public class SharedSubscriptionConsumingContext extends MultipleTopicTemplateContext { + + public SharedSubscriptionConsumingContext(PulsarTestEnvironment environment) { + this(environment, Collections.emptyList()); + } + + public SharedSubscriptionConsumingContext( + PulsarTestEnvironment environment, List<URL> connectorJarPaths) { + super(environment, connectorJarPaths); + } + + @Override + protected String displayName() { + return "consuming message with shared subscription"; + } + + @Override + protected String subscriptionName() { + return "flink-shared-subscription-test"; + } + + @Override + protected SubscriptionType subscriptionType() { + return SubscriptionType.Shared; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 1fe98128e3d..9787a8ceeb9 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -299,8 +299,7 @@ public class PulsarRuntimeOperator implements Closeable { */ public <T> List<MessageId> sendMessages( String topic, Schema<T> schema, String key, Collection<T> messages) { - Producer<T> producer = createProducer(topic, schema); - try { + try (Producer<T> producer = createProducer(topic, schema)) { List<MessageId> messageIds = new ArrayList<>(messages.size()); for (T message : messages) { @@ -311,20 +310,11 @@ public class PulsarRuntimeOperator implements Closeable { MessageId messageId = builder.send(); messageIds.add(messageId); } - + producer.flush(); return messageIds; } catch (PulsarClientException e) { sneakyThrow(e); return emptyList(); - } finally { - try { - // Waiting for all the pending messages be sent to the Pulsar. - producer.flush(); - // Directly close without the flush will drop all the pending messages. - producer.close(); - } catch (PulsarClientException e) { - // Just ignore the exception here. - } } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 3d66728fded..10bd5120a46 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -124,7 +124,6 @@ public class PulsarContainerRuntime implements PulsarRuntime { try { if (operator != null) { operator.close(); - this.operator = null; } container.stop(); started.compareAndSet(true, false); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java index 07981cc3d2a..2ca9a51f3c5 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java @@ -86,7 +86,6 @@ public class PulsarEmbeddedRuntime implements PulsarRuntime { try { if (operator != null) { operator.close(); - this.operator = null; } if (pulsarService != null) { pulsarService.close(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java index a86ff5283f5..326c3ff6a89 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java @@ -65,8 +65,9 @@ public class PulsarMockRuntime implements PulsarRuntime { public void tearDown() { try { pulsarService.close(); - operator.close(); - this.operator = null; + if (operator != null) { + operator.close(); + } } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java index b345b4114e5..9fe1900be90 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java @@ -64,17 +64,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails; import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob; @@ -153,11 +152,11 @@ public abstract class SourceTestSuiteBase<T> { try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient)) { // Check test result LOG.info("Checking test results"); - checkResultWithSemantic(resultIterator, Arrays.asList(testRecords), semantic, null); + checkResultWithSemantic(resultIterator, singletonList(testRecords), semantic, null); } // Step 5: Clean up - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED)); } /** @@ -341,7 +340,7 @@ public abstract class SourceTestSuiteBase<T> { .stopWithSavepoint( true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL) .get(30, TimeUnit.SECONDS); - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED)); // Step 5: Generate new test data final List<List<T>> newTestRecordCollections = new ArrayList<>(); @@ -372,7 +371,7 @@ public abstract class SourceTestSuiteBase<T> { final JobClient restartJobClient = restartEnv.executeAsync("Restart Test"); - waitForJobStatus(restartJobClient, Collections.singletonList(JobStatus.RUNNING)); + waitForJobStatus(restartJobClient, singletonList(JobStatus.RUNNING)); try { iterator.setJobClient(restartJobClient); @@ -526,7 +525,7 @@ public abstract class SourceTestSuiteBase<T> { } // Step 5: Clean up - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED)); } /** @@ -589,7 +588,7 @@ public abstract class SourceTestSuiteBase<T> { LOG.info("Checking records before killing TaskManagers"); checkResultWithSemantic( iterator, - Arrays.asList(testRecordsBeforeFailure), + singletonList(testRecordsBeforeFailure), semantic, testRecordsBeforeFailure.size()); @@ -598,7 +597,7 @@ public abstract class SourceTestSuiteBase<T> { controller.triggerTaskManagerFailover(jobClient, () -> {}); LOG.info("Waiting for job recovering from failure"); - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.RUNNING)); + waitForJobStatus(jobClient, singletonList(JobStatus.RUNNING)); // Step 6: Write test data again to external system List<T> testRecordsAfterFailure = @@ -614,13 +613,13 @@ public abstract class SourceTestSuiteBase<T> { LOG.info("Checking records after job failover"); checkResultWithSemantic( iterator, - Arrays.asList(testRecordsAfterFailure), + singletonList(testRecordsAfterFailure), semantic, testRecordsAfterFailure.size()); // Step 8: Clean up terminateJob(jobClient); - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED)); + waitForJobStatus(jobClient, singletonList(JobStatus.CANCELED)); iterator.close(); } @@ -727,21 +726,19 @@ public abstract class SourceTestSuiteBase<T> { * @param semantic the supported semantic, see {@link CheckpointingMode} * @param limit expected number of the data to read from the job */ - private void checkResultWithSemantic( + protected void checkResultWithSemantic( CloseableIterator<T> resultIterator, List<List<T>> testData, CheckpointingMode semantic, Integer limit) { if (limit != null) { - assertThat( - CompletableFuture.supplyAsync( - () -> { - CollectIteratorAssertions.assertThat(resultIterator) - .withNumRecordsLimit(limit) - .matchesRecordsFromSource(testData, semantic); - return true; - })) - .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + Runnable runnable = + () -> + CollectIteratorAssertions.assertThat(resultIterator) + .withNumRecordsLimit(limit) + .matchesRecordsFromSource(testData, semantic); + + assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); } else { CollectIteratorAssertions.assertThat(resultIterator) .matchesRecordsFromSource(testData, semantic); @@ -768,7 +765,7 @@ public abstract class SourceTestSuiteBase<T> { private void killJob(JobClient jobClient) throws Exception { terminateJob(jobClient); - waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED)); + waitForJobStatus(jobClient, singletonList(JobStatus.CANCELED)); } /** Builder class for constructing {@link CollectResultIterator} of collect sink. */ diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java index 8bbb52972ef..8e10c60488e 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java @@ -24,8 +24,17 @@ import java.util.Iterator; * Entry point for assertion methods for {@link CollectIteratorAssert}. Each method in this class is * a static factory. */ -public class CollectIteratorAssertions { +public final class CollectIteratorAssertions { + + private CollectIteratorAssertions() { + // no constructor. + } + public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual) { return new CollectIteratorAssert<>(actual); } + + public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual) { + return new UnorderedCollectIteratorAssert<>(actual); + } } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java new file mode 100644 index 00000000000..8b03ecd798f --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.utils; + +import org.apache.flink.streaming.api.CheckpointingMode; + +import org.assertj.core.api.AbstractAssert; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import static java.util.stream.Collectors.toSet; +import static org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This assertion used to compare records in the collect iterator to the target test data with + * different semantics (AT_LEAST_ONCE, EXACTLY_ONCE) for unordered messages. + * + * @param <T> The type of records in the test data and collect iterator + */ +public class UnorderedCollectIteratorAssert<T> + extends AbstractAssert<UnorderedCollectIteratorAssert<T>, Iterator<T>> { + + private final Iterator<T> collectorIterator; + private final Set<T> allRecords; + private final Set<T> matchedRecords; + + private Integer limit = null; + + protected UnorderedCollectIteratorAssert(Iterator<T> collectorIterator) { + super(collectorIterator, UnorderedCollectIteratorAssert.class); + this.collectorIterator = collectorIterator; + this.allRecords = new HashSet<>(); + this.matchedRecords = new HashSet<>(); + } + + public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit) { + this.limit = limit; + return this; + } + + public void matchesRecordsFromSource( + List<List<T>> recordsBySplitsFromSource, CheckpointingMode semantic) { + for (List<T> list : recordsBySplitsFromSource) { + for (T t : list) { + assertTrue(allRecords.add(t), "All the records should be unique."); + } + } + + if (limit != null && limit > allRecords.size()) { + throw new IllegalArgumentException( + "Limit validation size should be less than or equal to total number of records from source"); + } + + switch (semantic) { + case AT_LEAST_ONCE: + compareWithAtLeastOnceSemantic(); + break; + case EXACTLY_ONCE: + compareWithExactlyOnceSemantic(); + break; + default: + throw new IllegalArgumentException("Unrecognized semantic \"" + semantic + "\""); + } + } + + private void compareWithAtLeastOnceSemantic() { + int recordCounter = 0; + while (collectorIterator.hasNext()) { + final T record = collectorIterator.next(); + if (allRecords.contains(record)) { + if (matchedRecords.add(record)) { + recordCounter++; + } + } else { + throw new IllegalArgumentException("Record " + record + " is not expected."); + } + + if (limit != null && recordCounter >= limit) { + break; + } + } + + verifyMatchedRecords(); + } + + private void compareWithExactlyOnceSemantic() { + int recordCounter = 0; + while (collectorIterator.hasNext()) { + final T record = collectorIterator.next(); + if (allRecords.contains(record)) { + assertTrue( + matchedRecords.add(record), + "Record " + record + " is duplicated in exactly-once."); + recordCounter++; + } else { + throw new IllegalArgumentException("Record " + record + " is not expected."); + } + + if (limit != null && recordCounter >= limit) { + break; + } + } + + verifyMatchedRecords(); + } + + private void verifyMatchedRecords() { + if (limit == null && allRecords.size() > matchedRecords.size()) { + Set<T> missingResults = + allRecords.stream().filter(not(matchedRecords::contains)).collect(toSet()); + if (!missingResults.isEmpty()) { + throw new IllegalArgumentException( + "Expected to have " + + allRecords.size() + + " elements. But we missing: " + + missingResults); + } + } + } +}
