This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 300e1ea1f2c [hotfix] Fix UnifiedSinkMigrationITCase
300e1ea1f2c is described below

commit 300e1ea1f2c508ef8318b20b80e3840af8306d1d
Author: Arvid Heise <ar...@apache.org>
AuthorDate: Fri Sep 20 16:08:40 2024 +0200

    [hotfix] Fix UnifiedSinkMigrationITCase
    
    UnifiedSinkMigrationITCase assumed that we also commit partial batches of 
committables. However, that was never the intend and fixed in FLINK-25920. This 
commit adjusts the test.
---
 .../runtime/UnifiedSinkMigrationITCase.java        | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java
index ac586c7df0c..b5ce8207216 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java
@@ -236,6 +236,7 @@ class UnifiedSinkMigrationITCase {
         @Override
         public void write(Long element, Context context) throws IOException, 
InterruptedException {}
 
+        /** Creates two committables on the very first checkpoint. */
         @Override
         public List<Integer> prepareCommit(boolean flush) throws IOException, 
InterruptedException {
             if (emitted || recovered) {
@@ -265,16 +266,27 @@ class UnifiedSinkMigrationITCase {
             this.commitLatch = commitLatch;
         }
 
+        /**
+         * On first attempt: send GLOBAL_COMMITTER_STATE downstream, keep 
COMMITTER_STATE for retry.
+         * On second attempt: keep COMMITTER_STATE for retry on first 
checkpoint, then send
+         * downstream. Only then, global committer should be triggered.
+         */
         @Override
         public List<Integer> commit(List<Integer> committables)
                 throws IOException, InterruptedException {
-            if (firstCommit && !recovered) {
-                assertThat(committables).containsExactly(COMMITTER_STATE, 
GLOBAL_COMMITTER_STATE);
-            } else {
-                assertThat(committables).containsExactly(COMMITTER_STATE);
+            if (firstCommit) {
+                if (!recovered) {
+                    assertThat(committables)
+                            .containsExactly(COMMITTER_STATE, 
GLOBAL_COMMITTER_STATE);
+                } else if (recovered) {
+                    assertThat(committables).containsExactly(COMMITTER_STATE);
+                }
             }
             LOG.info("Committing {}", committables);
             commitLatch.get().countDown();
+            if (recovered && !firstCommit) {
+                return Collections.emptyList();
+            }
             firstCommit = false;
             // Always retry to keep the state
             return Collections.singletonList(COMMITTER_STATE);
@@ -317,6 +329,7 @@ class UnifiedSinkMigrationITCase {
             return String.valueOf(committables.get(0));
         }
 
+        /** Wait for all committables (after recovery on second checkpoint). */
         @Override
         public List<String> commit(List<String> globalCommittables)
                 throws IOException, InterruptedException {
@@ -329,7 +342,7 @@ class UnifiedSinkMigrationITCase {
                         
.containsExactly(String.valueOf(GLOBAL_COMMITTER_STATE));
             }
             firstCommitAfterRecover = false;
-            return globalCommittables;
+            return recover ? Collections.emptyList() : globalCommittables;
         }
 
         @Override

Reply via email to