This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f53ebf45 [Flink] Do not mark done partition if job failover from 
checkpoint. (#5581)
74f53ebf45 is described below

commit 74f53ebf453eee491067ee129e8e3b28e1486732
Author: wangwj <[email protected]>
AuthorDate: Wed May 14 12:05:34 2025 +0800

    [Flink] Do not mark done partition if job failover from checkpoint. (#5581)
---
 .../org/apache/paimon/flink/sink/Committer.java    | 10 +++++++---
 .../paimon/flink/sink/CommitterOperator.java       |  2 +-
 .../RestoreAndFailCommittableStateManager.java     |  2 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   |  7 +++++--
 .../paimon/flink/sink/StoreMultiCommitter.java     |  7 ++++---
 .../flink/sink/partition/PartitionListener.java    |  2 +-
 .../flink/sink/partition/PartitionListeners.java   |  9 ++++++++-
 .../flink/sink/partition/PartitionMarkDone.java    |  6 +++++-
 .../sink/partition/ReportPartStatsListener.java    |  3 ++-
 .../sink/partition/PartitionMarkDoneTest.java      | 23 ++++++++++++++++++----
 10 files changed, 53 insertions(+), 18 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 23c6c7faeb..4679318b7e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -53,11 +53,15 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
      * Filter out all {@link GlobalCommitT} which have committed, and commit 
the remaining {@link
      * GlobalCommitT}.
      */
-    int filterAndCommit(List<GlobalCommitT> globalCommittables, boolean 
checkAppendFiles)
+    int filterAndCommit(
+            List<GlobalCommitT> globalCommittables,
+            boolean checkAppendFiles,
+            boolean recoveryFromState)
             throws IOException;
 
-    default int filterAndCommit(List<GlobalCommitT> globalCommittables) throws 
IOException {
-        return filterAndCommit(globalCommittables, true);
+    default int filterAndCommitFromState(List<GlobalCommitT> 
globalCommittables)
+            throws IOException {
+        return filterAndCommit(globalCommittables, true, true);
     }
 
     Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT> 
committables);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 6354c0ecd0..b5c73f17f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -210,7 +210,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
             // So when `endInput` is called, we must check if the 
corresponding snapshot exists.
             // However, if the snapshot does not exist, then append files must 
be new files. So
             // there is no need to check for duplicated append files.
-            committer.filterAndCommit(committables, false);
+            committer.filterAndCommit(committables, false, false);
         } else {
             committer.commit(committables);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index be795fbab6..dda1abf90d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -79,7 +79,7 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
 
     private void recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
             throws Exception {
-        int numCommitted = committer.filterAndCommit(committables);
+        int numCommitted = committer.filterAndCommitFromState(committables);
         if (numCommitted > 0) {
             throw new RuntimeException(
                     "This exception is intentionally thrown "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 4908b99317..d3d6f206cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -115,9 +115,12 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
 
     @Override
     public int filterAndCommit(
-            List<ManifestCommittable> globalCommittables, boolean 
checkAppendFiles) {
+            List<ManifestCommittable> globalCommittables,
+            boolean checkAppendFiles,
+            boolean recoveryFromState) {
         int committed = commit.filterAndCommitMultiple(globalCommittables, 
checkAppendFiles);
-        partitionListeners.notifyCommittable(globalCommittables);
+        partitionListeners.notifyCommittable(globalCommittables, 
recoveryFromState);
+
         return committed;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 8ad3e4fb08..886378ef7d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -178,14 +178,15 @@ public class StoreMultiCommitter
 
     @Override
     public int filterAndCommit(
-            List<WrappedManifestCommittable> globalCommittables, boolean 
checkAppendFiles)
-            throws IOException {
+            List<WrappedManifestCommittable> globalCommittables,
+            boolean checkAppendFiles,
+            boolean recoveryFromState) {
         int result = 0;
         for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
                 groupByTable(globalCommittables).entrySet()) {
             result +=
                     getStoreCommitter(entry.getKey())
-                            .filterAndCommit(entry.getValue(), 
checkAppendFiles);
+                            .filterAndCommit(entry.getValue(), 
checkAppendFiles, recoveryFromState);
         }
         return result;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
index 65d25fbc02..f01df4b870 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
@@ -26,7 +26,7 @@ import java.util.List;
 /** The partition listener. */
 public interface PartitionListener extends Closeable {
 
-    void notifyCommittable(List<ManifestCommittable> committables);
+    void notifyCommittable(List<ManifestCommittable> committables, boolean 
recoverFromState);
 
     void snapshotState() throws Exception;
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index cbf14da456..4e14e772a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -39,7 +39,14 @@ public class PartitionListeners implements Closeable {
 
     public void notifyCommittable(List<ManifestCommittable> committables) {
         for (PartitionListener trigger : listeners) {
-            trigger.notifyCommittable(committables);
+            trigger.notifyCommittable(committables, false);
+        }
+    }
+
+    public void notifyCommittable(
+            List<ManifestCommittable> committables, boolean recoveryFromState) 
{
+        for (PartitionListener trigger : listeners) {
+            trigger.notifyCommittable(committables, recoveryFromState);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 7e7aa57495..fad23c6085 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -133,7 +133,11 @@ public class PartitionMarkDone implements 
PartitionListener {
     }
 
     @Override
-    public void notifyCommittable(List<ManifestCommittable> committables) {
+    public void notifyCommittable(
+            List<ManifestCommittable> committables, boolean recoverFromState) {
+        if (recoverFromState) {
+            return;
+        }
         if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
             markDoneByWatermark(committables);
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index 45e461405c..effb63c87e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -85,7 +85,8 @@ public class ReportPartStatsListener implements 
PartitionListener {
         this.idleTime = idleTime;
     }
 
-    public void notifyCommittable(List<ManifestCommittable> committables) {
+    public void notifyCommittable(
+            List<ManifestCommittable> committables, boolean 
recoverFromCheckpoint) {
         Set<String> partition = new HashSet<>();
         boolean endInput = false;
         for (ManifestCommittable committable : committables) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 9d71f94a36..349d56c71c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -58,15 +58,20 @@ class PartitionMarkDoneTest extends TableTestBase {
 
     @Test
     public void testTriggerByCompaction() throws Exception {
-        innerTest(true);
+        innerTest(true, false);
     }
 
     @Test
     public void testNotTriggerByCompaction() throws Exception {
-        innerTest(false);
+        innerTest(false, false);
     }
 
-    private void innerTest(boolean deletionVectors) throws Exception {
+    @Test
+    public void testTriggerWhenRecoveryFromState() throws Exception {
+        innerTest(false, true);
+    }
+
+    private void innerTest(boolean deletionVectors, boolean recoveryFromState) 
throws Exception {
         Identifier identifier = identifier("T");
         Schema schema =
                 Schema.newBuilder()
@@ -94,6 +99,11 @@ class PartitionMarkDoneTest extends TableTestBase {
                                 table)
                         .get();
 
+        if (recoveryFromState) {
+            notifyCommits(markDone, false, true);
+            assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+        }
+
         notifyCommits(markDone, true);
         
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
 
@@ -104,6 +114,11 @@ class PartitionMarkDoneTest extends TableTestBase {
     }
 
     public static void notifyCommits(PartitionMarkDone markDone, boolean 
isCompact) {
+        notifyCommits(markDone, isCompact, false);
+    }
+
+    private static void notifyCommits(
+            PartitionMarkDone markDone, boolean isCompact, boolean 
recoveryFromState) {
         ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
         DataFileMeta file = DataFileTestUtils.newFile();
         CommitMessageImpl compactMessage;
@@ -127,7 +142,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                             new IndexIncrement(emptyList()));
         }
         committable.addFileCommittable(compactMessage);
-        markDone.notifyCommittable(singletonList(committable));
+        markDone.notifyCommittable(singletonList(committable), 
recoveryFromState);
     }
 
     public static class MockOperatorStateStore implements OperatorStateStore {

Reply via email to