This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git


The following commit(s) were added to refs/heads/main by this push:
     new f5372f2  [FLINK-33018][Tests] Fix flaky test when cancelling source
f5372f2 is described below

commit f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab
Author: Ryan Skraba <rskr...@apache.com>
AuthorDate: Mon Sep 11 17:44:13 2023 +0200

    [FLINK-33018][Tests] Fix flaky test when cancelling source
---
 .../connectors/gcp/pubsub/PubSubConsumingTest.java   | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
 
b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
index c6049d8..abcb15e 100644
--- 
a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
+++ 
b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
@@ -108,19 +108,19 @@ class PubSubConsumingTest {
         Thread thread = createSourceThread(pubSubSource, lock, results);
         try {
             thread.start();
-            awaitRecordCount(results, 2);
-
-            // we do not emit the end of stream record
-            assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", 
"B"));
-            pubSubSource.snapshotState(0, 0);
-            pubSubSource.notifyCheckpointComplete(0);
-            // we acknowledge also the end of the stream record
-            assertThat(testPubSubSubscriber.getAcknowledgedIds())
-                    .isEqualTo(Arrays.asList("1", "2", "3"));
+            // The source thread will finish automatically, without waiting 
for records or
+            // explicitly cancelling the source.
         } finally {
-            pubSubSource.cancel();
             thread.join();
         }
+
+        // we do not emit the end of stream record
+        assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A", 
"B"));
+        pubSubSource.snapshotState(0, 0);
+        pubSubSource.notifyCheckpointComplete(0);
+        // we acknowledge also the end of the stream record
+        assertThat(testPubSubSubscriber.getAcknowledgedIds())
+                .isEqualTo(Arrays.asList("1", "2", "3"));
     }
 
     @Test

Reply via email to