This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch v4.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit d909123f51ecd7e9ed6b85e0c09bc886fc677c4b
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Mon May 18 19:22:25 2026 +0200

    [FLINK-39699][tests] Wait for completed checkpoint stats in KafkaSinkITCase
    
    (cherry picked from commit 2d4419874a88ac9121885182e0d972bfddf06921)
    Generated-by: Claude Code (Opus 4.8)
---
 .../connector/kafka/sink/KafkaSinkITCase.java      | 45 ++++++++++++++--------
 1 file changed, 29 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 4c3ab2d6..6309b02d 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -55,7 +55,6 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -120,6 +119,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -140,6 +140,8 @@ public class KafkaSinkITCase extends TestLogger {
     private static final Network NETWORK = Network.newNetwork();
     private static final int ZK_TIMEOUT_MILLIS = 30000;
     private static final short TOPIC_REPLICATION_FACTOR = 1;
+    private static final long CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS = 10;
+    private static final long CHECKPOINT_PATH_LOOKUP_POLL_INTERVAL_MILLIS = 
200;
     private static AdminClient admin;
 
     private String topic;
@@ -329,11 +331,7 @@ public class KafkaSinkITCase extends TestLogger {
         } catch (Exception e) {
             assertThat(e).hasStackTraceContaining("Exceeded checkpoint 
tolerable failure");
         }
-        final Optional<String> completedCheckpoint =
-                CommonTestUtils.getLatestCompletedCheckpointPath(firstJobId, 
miniCluster);
-
-        assertThat(completedCheckpoint).isPresent();
-        config.set(SAVEPOINT_PATH, completedCheckpoint.get());
+        config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, 
firstJobId));
 
         // Run a second job which aborts all lingering transactions and new 
consumer should
         // immediately see the newly written records
@@ -426,7 +424,7 @@ public class KafkaSinkITCase extends TestLogger {
                         "firstPrefix",
                         clusterClient);
 
-        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId));
+        config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, 
firstJobId));
         config.set(CoreOptions.DEFAULT_PARALLELISM, newParallelsm);
 
         // Run a second job which aborts all lingering transactions and new 
consumer should
@@ -441,7 +439,7 @@ public class KafkaSinkITCase extends TestLogger {
                         "secondPrefix",
                         clusterClient);
 
-        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, 
secondJobId));
+        config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, 
secondJobId));
         config.set(CoreOptions.DEFAULT_PARALLELISM, oldParallelism);
 
         SharedReference<AtomicBoolean> failed = sharedObjects.add(new 
AtomicBoolean(true));
@@ -459,12 +457,27 @@ public class KafkaSinkITCase extends TestLogger {
         
assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
     }
 
-    private String getCheckpointPath(MiniCluster miniCluster, JobID 
secondJobId)
-            throws InterruptedException, ExecutionException, 
FlinkJobNotFoundException {
-        final Optional<String> completedCheckpoint =
-                CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, 
miniCluster);
-
-        assertThat(completedCheckpoint).isPresent();
+    private String waitForCompletedCheckpointPath(MiniCluster miniCluster, 
JobID jobId)
+            throws Exception {
+        // The CompletedCheckpointStats can briefly lag behind 
requestJobResult() returning,
+        // so poll with a bounded timeout instead of asserting on the first 
miss.
+        final long deadline =
+                System.nanoTime()
+                        + 
TimeUnit.SECONDS.toNanos(CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS);
+        Optional<String> completedCheckpoint = Optional.empty();
+        while (System.nanoTime() < deadline) {
+            completedCheckpoint =
+                    CommonTestUtils.getLatestCompletedCheckpointPath(jobId, 
miniCluster);
+            if (completedCheckpoint.isPresent()) {
+                return completedCheckpoint.get();
+            }
+            Thread.sleep(CHECKPOINT_PATH_LOOKUP_POLL_INTERVAL_MILLIS);
+        }
+        assertThat(completedCheckpoint)
+                .as(
+                        "Job %s did not expose a completed checkpoint within 
%ds",
+                        jobId, CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS)
+                .isPresent();
         return completedCheckpoint.get();
     }
 
@@ -499,7 +512,7 @@ public class KafkaSinkITCase extends TestLogger {
                         clusterClient);
 
         // Run a second job which switching to POOLING
-        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId));
+        config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, 
firstJobId));
         config.set(CoreOptions.DEFAULT_PARALLELISM, 5);
         JobID secondJobId2 =
                 executeWithMapper(
@@ -512,7 +525,7 @@ public class KafkaSinkITCase extends TestLogger {
                         clusterClient);
 
         // Run a third job with downscaling
-        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, 
secondJobId2));
+        config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, 
secondJobId2));
         config.set(CoreOptions.DEFAULT_PARALLELISM, 3);
         JobID thirdJobId =
                 executeWithMapper(

Reply via email to