This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git
The following commit(s) were added to refs/heads/v3.0 by this push: new 557acd8 [FLINK-33018][Tests] Fix flaky test when cancelling source 557acd8 is described below commit 557acd8d6d121d27eb6b06f29f552089ea0adad3 Author: Ryan Skraba <rskr...@apache.com> AuthorDate: Mon Sep 11 17:44:13 2023 +0200 [FLINK-33018][Tests] Fix flaky test when cancelling source (cherry picked from commit f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab) --- .../connectors/gcp/pubsub/PubSubConsumingTest.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java index c6049d8..abcb15e 100644 --- a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java +++ b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java @@ -108,19 +108,19 @@ class PubSubConsumingTest { Thread thread = createSourceThread(pubSubSource, lock, results); try { thread.start(); - awaitRecordCount(results, 2); - - // we do not emit the end of stream record - assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B")); - pubSubSource.snapshotState(0, 0); - pubSubSource.notifyCheckpointComplete(0); - // we acknowledge also the end of the stream record - assertThat(testPubSubSubscriber.getAcknowledgedIds()) - .isEqualTo(Arrays.asList("1", "2", "3")); + // The source thread will finish automatically, without waiting for records or + // explicitly cancelling the source. } finally { - pubSubSource.cancel(); thread.join(); } + + // we do not emit the end of stream record + assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", "B")); + pubSubSource.snapshotState(0, 0); + pubSubSource.notifyCheckpointComplete(0); + // we acknowledge also the end of the stream record + assertThat(testPubSubSubscriber.getAcknowledgedIds()) + .isEqualTo(Arrays.asList("1", "2", "3")); } @Test