This is an automated email from the ASF dual-hosted git repository. MartijnVisser pushed a commit to branch v4.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit d909123f51ecd7e9ed6b85e0c09bc886fc677c4b Author: Aleksandr Savonin <[email protected]> AuthorDate: Mon May 18 19:22:25 2026 +0200 [FLINK-39699][tests] Wait for completed checkpoint stats in KafkaSinkITCase (cherry picked from commit 2d4419874a88ac9121885182e0d972bfddf06921) Generated-by: Claude Code (Opus 4.8) --- .../connector/kafka/sink/KafkaSinkITCase.java | 45 ++++++++++++++-------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index 4c3ab2d6..6309b02d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -55,7 +55,6 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; -import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -120,6 +119,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -140,6 +140,8 @@ public class KafkaSinkITCase extends TestLogger { private static final Network NETWORK = Network.newNetwork(); private static final int ZK_TIMEOUT_MILLIS = 30000; private static final short TOPIC_REPLICATION_FACTOR = 1; + private static final long CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS = 10; + private static final long CHECKPOINT_PATH_LOOKUP_POLL_INTERVAL_MILLIS = 200; private static AdminClient admin; private String topic; @@ -329,11 +331,7 @@ public class KafkaSinkITCase extends TestLogger { } catch (Exception e) { assertThat(e).hasStackTraceContaining("Exceeded checkpoint tolerable failure"); } - final Optional<String> completedCheckpoint = - CommonTestUtils.getLatestCompletedCheckpointPath(firstJobId, miniCluster); - - assertThat(completedCheckpoint).isPresent(); - config.set(SAVEPOINT_PATH, completedCheckpoint.get()); + config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, firstJobId)); // Run a second job which aborts all lingering transactions and new consumer should // immediately see the newly written records @@ -426,7 +424,7 @@ public class KafkaSinkITCase extends TestLogger { "firstPrefix", clusterClient); - config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId)); + config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, firstJobId)); config.set(CoreOptions.DEFAULT_PARALLELISM, newParallelsm); // Run a second job which aborts all lingering transactions and new consumer should @@ -441,7 +439,7 @@ public class KafkaSinkITCase extends TestLogger { "secondPrefix", clusterClient); - config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, secondJobId)); + config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, secondJobId)); config.set(CoreOptions.DEFAULT_PARALLELISM, oldParallelism); SharedReference<AtomicBoolean> failed = sharedObjects.add(new AtomicBoolean(true)); @@ -459,12 +457,27 @@ public class KafkaSinkITCase extends TestLogger { assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); } - private String getCheckpointPath(MiniCluster miniCluster, JobID secondJobId) - throws InterruptedException, ExecutionException, FlinkJobNotFoundException { - final Optional<String> completedCheckpoint = - CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, miniCluster); - - assertThat(completedCheckpoint).isPresent(); + private String waitForCompletedCheckpointPath(MiniCluster miniCluster, JobID jobId) + throws Exception { + // The CompletedCheckpointStats can briefly lag behind requestJobResult() returning, + // so poll with a bounded timeout instead of asserting on the first miss. + final long deadline = + System.nanoTime() + + TimeUnit.SECONDS.toNanos(CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS); + Optional<String> completedCheckpoint = Optional.empty(); + while (System.nanoTime() < deadline) { + completedCheckpoint = + CommonTestUtils.getLatestCompletedCheckpointPath(jobId, miniCluster); + if (completedCheckpoint.isPresent()) { + return completedCheckpoint.get(); + } + Thread.sleep(CHECKPOINT_PATH_LOOKUP_POLL_INTERVAL_MILLIS); + } + assertThat(completedCheckpoint) + .as( + "Job %s did not expose a completed checkpoint within %ds", + jobId, CHECKPOINT_PATH_LOOKUP_TIMEOUT_SECONDS) + .isPresent(); return completedCheckpoint.get(); } @@ -499,7 +512,7 @@ public class KafkaSinkITCase extends TestLogger { clusterClient); // Run a second job which switching to POOLING - config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId)); + config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, firstJobId)); config.set(CoreOptions.DEFAULT_PARALLELISM, 5); JobID secondJobId2 = executeWithMapper( @@ -512,7 +525,7 @@ public class KafkaSinkITCase extends TestLogger { clusterClient); // Run a third job with downscaling - config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, secondJobId2)); + config.set(SAVEPOINT_PATH, waitForCompletedCheckpointPath(miniCluster, secondJobId2)); config.set(CoreOptions.DEFAULT_PARALLELISM, 3); JobID thirdJobId = executeWithMapper(
