This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new e75111d  [FLINK-20928] Fix flaky test by retrying 
notifyCheckpointComplete until either commit success or timeout
e75111d is described below

commit e75111dffc6a7bf05ade246cc5485d50cad8162f
Author: Dong Lin <lindon...@gmail.com>
AuthorDate: Fri Oct 15 09:46:06 2021 +0800

    [FLINK-20928] Fix flaky test by retrying notifyCheckpointComplete until 
either commit success or timeout
---
 .../kafka/source/reader/KafkaSourceReader.java      | 13 ++++++++++---
 .../kafka/source/reader/KafkaSourceReaderTest.java  | 20 +++++++++++++++-----
 .../flink/core/testutils/CommonTestUtils.java       | 21 +++++++++++++++++++--
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 08f82b0..4a42a48 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -125,9 +125,18 @@ public class KafkaSourceReader<T>
             return;
         }
 
+        Map<TopicPartition, OffsetAndMetadata> committedPartitions =
+                offsetsToCommit.get(checkpointId);
+        if (committedPartitions == null) {
+            LOG.debug(
+                    "Offsets for checkpoint {} either do not exist or have 
already been committed.",
+                    checkpointId);
+            return;
+        }
+
         ((KafkaSourceFetcherManager<T>) splitFetcherManager)
                 .commitOffsets(
-                        offsetsToCommit.get(checkpointId),
+                        committedPartitions,
                         (ignored, e) -> {
                             // The offset commit here is needed by the 
external monitoring. It won't
                             // break Flink job's correctness if we fail to 
commit the offset here.
@@ -144,8 +153,6 @@ public class KafkaSourceReader<T>
                                 
kafkaSourceReaderMetrics.recordSucceededCommit();
                                 // If the finished topic partition has been 
committed, we remove it
                                 // from the offsets of the finished splits map.
-                                Map<TopicPartition, OffsetAndMetadata> 
committedPartitions =
-                                        offsetsToCommit.get(checkpointId);
                                 committedPartitions.forEach(
                                         (tp, offset) ->
                                                 
kafkaSourceReaderMetrics.recordCommittedOffset(
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 0304315..16cab86 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -206,11 +206,21 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
 
             // The completion of the last checkpoint should subsume all the 
previous checkpoitns.
             assertEquals(checkpointId, reader.getOffsetsToCommit().size());
-            reader.notifyCheckpointComplete(checkpointId);
-            pollUntil(
-                    reader,
-                    output,
-                    () -> reader.getOffsetsToCommit().isEmpty(),
+
+            long lastCheckpointId = checkpointId;
+            waitUtil(
+                    () -> {
+                        try {
+                            reader.notifyCheckpointComplete(lastCheckpointId);
+                        } catch (Exception exception) {
+                            throw new RuntimeException(
+                                    "Caught unexpected exception when polling 
from the reader",
+                                    exception);
+                        }
+                        return reader.getOffsetsToCommit().isEmpty();
+                    },
+                    Duration.ofSeconds(60),
+                    Duration.ofSeconds(1),
                     "The offset commit did not finish before timeout.");
         }
 
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 4b72f9b..76a5f28 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -190,13 +190,15 @@ public class CommonTestUtils {
      *
      * @param condition the condition to wait for.
      * @param timeout the maximum time to wait for the condition to become 
true.
+     * @param pause delay between condition checks.
      * @param errorMsg the error message to include in the 
<code>TimeoutException</code> if the
      *     condition was not met before timeout.
      * @throws TimeoutException if the condition is not met before timeout.
      * @throws InterruptedException if the thread is interrupted.
      */
     @SuppressWarnings("BusyWait")
-    public static void waitUtil(Supplier<Boolean> condition, Duration timeout, 
String errorMsg)
+    public static void waitUtil(
+            Supplier<Boolean> condition, Duration timeout, Duration pause, 
String errorMsg)
             throws TimeoutException, InterruptedException {
         long timeoutMs = timeout.toMillis();
         if (timeoutMs <= 0) {
@@ -204,10 +206,25 @@ public class CommonTestUtils {
         }
         long startingTime = System.currentTimeMillis();
         while (!condition.get() && System.currentTimeMillis() - startingTime < 
timeoutMs) {
-            Thread.sleep(1);
+            Thread.sleep(pause.toMillis());
         }
         if (!condition.get()) {
             throw new TimeoutException(errorMsg);
         }
     }
+
+    /**
+     * Wait util the given condition is met or timeout.
+     *
+     * @param condition the condition to wait for.
+     * @param timeout the maximum time to wait for the condition to become 
true.
+     * @param errorMsg the error message to include in the 
<code>TimeoutException</code> if the
+     *     condition was not met before timeout.
+     * @throws TimeoutException if the condition is not met before timeout.
+     * @throws InterruptedException if the thread is interrupted.
+     */
+    public static void waitUtil(Supplier<Boolean> condition, Duration timeout, 
String errorMsg)
+            throws TimeoutException, InterruptedException {
+        waitUtil(condition, timeout, Duration.ofMillis(1), errorMsg);
+    }
 }

Reply via email to