This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa904edfd [core] Postpone mode should not mark done when still has 
compaction (#6561)
7fa904edfd is described below

commit 7fa904edfde04392e219ba0ea9d20923cbd96aad
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 17 10:59:20 2025 +0800

    [core] Postpone mode should not mark done when still has compaction (#6561)
---
 .../sink/listener/PartitionMarkDoneListener.java   | 19 ++++++----
 .../flink/sink/listener/PartitionMarkDoneTest.java | 40 ++++++++++++++--------
 2 files changed, 38 insertions(+), 21 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index 0283b197d2..92966014a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -25,6 +25,7 @@ import 
org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -88,12 +89,18 @@ public class PartitionMarkDoneListener implements 
CommitListener {
         List<PartitionMarkDoneAction> actions =
                 PartitionMarkDoneAction.createActions(cl, table, coreOptions);
 
-        // if batch read skip level 0 files, we should wait compaction to mark 
done
-        // otherwise, some data may not be readable, and there might be data 
delays
-        boolean waitCompaction =
-                !table.primaryKeys().isEmpty()
-                        && (coreOptions.deletionVectorsEnabled()
-                                || coreOptions.mergeEngine() == 
MergeEngine.FIRST_ROW);
+        boolean waitCompaction = false;
+        if (!table.primaryKeys().isEmpty()) {
+            // some situation should wait compaction to mark done, otherwise, 
some data may not be
+            // readable, and there might be data delays
+            if (coreOptions.deletionVectorsEnabled()) {
+                waitCompaction = true;
+            } else if (coreOptions.mergeEngine() == MergeEngine.FIRST_ROW) {
+                waitCompaction = true;
+            } else if (table.bucketMode() == BucketMode.POSTPONE_MODE) {
+                waitCompaction = true;
+            }
+        }
 
         return Optional.of(
                 new PartitionMarkDoneListener(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
index 43597da654..aa0e00067d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
@@ -28,6 +29,9 @@ import org.apache.paimon.types.DataTypes;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.junit.jupiter.api.Test;
 
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
@@ -38,22 +42,32 @@ class PartitionMarkDoneTest extends TableTestBase {
 
     @Test
     public void testTriggerByCompaction() throws Exception {
-        innerTest(true, true);
+        innerTest(options -> options.set(DELETION_VECTORS_ENABLED, true), 
true, false);
+    }
+
+    @Test
+    public void testTriggerByCompaction2() throws Exception {
+        innerTest(options -> options.set(BUCKET, -2), true, false);
     }
 
     @Test
     public void testNotTriggerByCompaction() throws Exception {
-        innerTest(false, true);
+        innerTest(options -> {}, true, true);
     }
 
     @Test
     public void testNotTriggerWhenRecoveryFromState() throws Exception {
-        innerTest(false, false);
+        innerTest(options -> {}, false, true);
     }
 
-    private void innerTest(boolean deletionVectors, boolean 
partitionMarkDoneRecoverFromState)
+    private void innerTest(
+            Consumer<Options> config, boolean recoverFromState, boolean 
shouldMarkDone)
             throws Exception {
         Identifier identifier = identifier("T");
+        Options options = new Options();
+        options.set(PARTITION_MARK_DONE_WHEN_END_INPUT, true);
+        options.set(PARTITION_MARK_DONE_ACTION, "success-file");
+        config.accept(options);
         Schema schema =
                 Schema.newBuilder()
                         .column("a", DataTypes.INT())
@@ -61,11 +75,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                         .column("c", DataTypes.INT())
                         .partitionKeys("a")
                         .primaryKey("a", "b")
-                        .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(), 
"true")
-                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file")
-                        .option(
-                                DELETION_VECTORS_ENABLED.key(),
-                                Boolean.valueOf(deletionVectors).toString())
+                        .options(options.toMap())
                         .build();
         catalog.createTable(identifier, schema, true);
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
@@ -80,17 +90,17 @@ class PartitionMarkDoneTest extends TableTestBase {
                                 table)
                         .get();
 
-        if (!partitionMarkDoneRecoverFromState) {
-            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+        if (!recoverFromState) {
+            notifyCommits(markDone, false, false);
             assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
             return;
         }
 
-        notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
-        
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
+        notifyCommits(markDone, true, true);
+        
assertThat(table.fileIO().exists(successFile)).isEqualTo(!shouldMarkDone);
 
-        if (!deletionVectors) {
-            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+        if (shouldMarkDone) {
+            notifyCommits(markDone, false, true);
             assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
         }
     }

Reply via email to