This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 300e1ea1f2c [hotfix] Fix UnifiedSinkMigrationITCase 300e1ea1f2c is described below commit 300e1ea1f2c508ef8318b20b80e3840af8306d1d Author: Arvid Heise <ar...@apache.org> AuthorDate: Fri Sep 20 16:08:40 2024 +0200 [hotfix] Fix UnifiedSinkMigrationITCase UnifiedSinkMigrationITCase assumed that we also commit partial batches of committables. However, that was never the intend and fixed in FLINK-25920. This commit adjusts the test. --- .../runtime/UnifiedSinkMigrationITCase.java | 23 +++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java index ac586c7df0c..b5ce8207216 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java @@ -236,6 +236,7 @@ class UnifiedSinkMigrationITCase { @Override public void write(Long element, Context context) throws IOException, InterruptedException {} + /** Creates two committables on the very first checkpoint. */ @Override public List<Integer> prepareCommit(boolean flush) throws IOException, InterruptedException { if (emitted || recovered) { @@ -265,16 +266,27 @@ class UnifiedSinkMigrationITCase { this.commitLatch = commitLatch; } + /** + * On first attempt: send GLOBAL_COMMITTER_STATE downstream, keep COMMITTER_STATE for retry. + * On second attempt: keep COMMITTER_STATE for retry on first checkpoint, then send + * downstream. Only then, global committer should be triggered. + */ @Override public List<Integer> commit(List<Integer> committables) throws IOException, InterruptedException { - if (firstCommit && !recovered) { - assertThat(committables).containsExactly(COMMITTER_STATE, GLOBAL_COMMITTER_STATE); - } else { - assertThat(committables).containsExactly(COMMITTER_STATE); + if (firstCommit) { + if (!recovered) { + assertThat(committables) + .containsExactly(COMMITTER_STATE, GLOBAL_COMMITTER_STATE); + } else if (recovered) { + assertThat(committables).containsExactly(COMMITTER_STATE); + } } LOG.info("Committing {}", committables); commitLatch.get().countDown(); + if (recovered && !firstCommit) { + return Collections.emptyList(); + } firstCommit = false; // Always retry to keep the state return Collections.singletonList(COMMITTER_STATE); @@ -317,6 +329,7 @@ class UnifiedSinkMigrationITCase { return String.valueOf(committables.get(0)); } + /** Wait for all committables (after recovery on second checkpoint). */ @Override public List<String> commit(List<String> globalCommittables) throws IOException, InterruptedException { @@ -329,7 +342,7 @@ class UnifiedSinkMigrationITCase { .containsExactly(String.valueOf(GLOBAL_COMMITTER_STATE)); } firstCommitAfterRecover = false; - return globalCommittables; + return recover ? Collections.emptyList() : globalCommittables; } @Override