This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7fa904edfd [core] Postpone mode should not mark done when still has
compaction (#6561)
7fa904edfd is described below
commit 7fa904edfde04392e219ba0ea9d20923cbd96aad
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 17 10:59:20 2025 +0800
[core] Postpone mode should not mark done when still has compaction (#6561)
---
.../sink/listener/PartitionMarkDoneListener.java | 19 ++++++----
.../flink/sink/listener/PartitionMarkDoneTest.java | 40 ++++++++++++++--------
2 files changed, 38 insertions(+), 21 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index 0283b197d2..92966014a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -25,6 +25,7 @@ import
org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -88,12 +89,18 @@ public class PartitionMarkDoneListener implements
CommitListener {
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(cl, table, coreOptions);
- // if batch read skip level 0 files, we should wait compaction to mark
done
- // otherwise, some data may not be readable, and there might be data
delays
- boolean waitCompaction =
- !table.primaryKeys().isEmpty()
- && (coreOptions.deletionVectorsEnabled()
- || coreOptions.mergeEngine() ==
MergeEngine.FIRST_ROW);
+ boolean waitCompaction = false;
+ if (!table.primaryKeys().isEmpty()) {
+ // some situation should wait compaction to mark done, otherwise,
some data may not be
+ // readable, and there might be data delays
+ if (coreOptions.deletionVectorsEnabled()) {
+ waitCompaction = true;
+ } else if (coreOptions.mergeEngine() == MergeEngine.FIRST_ROW) {
+ waitCompaction = true;
+ } else if (table.bucketMode() == BucketMode.POSTPONE_MODE) {
+ waitCompaction = true;
+ }
+ }
return Optional.of(
new PartitionMarkDoneListener(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
index 43597da654..aa0e00067d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
@@ -28,6 +29,9 @@ import org.apache.paimon.types.DataTypes;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.junit.jupiter.api.Test;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
@@ -38,22 +42,32 @@ class PartitionMarkDoneTest extends TableTestBase {
@Test
public void testTriggerByCompaction() throws Exception {
- innerTest(true, true);
+ innerTest(options -> options.set(DELETION_VECTORS_ENABLED, true),
true, false);
+ }
+
+ @Test
+ public void testTriggerByCompaction2() throws Exception {
+ innerTest(options -> options.set(BUCKET, -2), true, false);
}
@Test
public void testNotTriggerByCompaction() throws Exception {
- innerTest(false, true);
+ innerTest(options -> {}, true, true);
}
@Test
public void testNotTriggerWhenRecoveryFromState() throws Exception {
- innerTest(false, false);
+ innerTest(options -> {}, false, true);
}
- private void innerTest(boolean deletionVectors, boolean
partitionMarkDoneRecoverFromState)
+ private void innerTest(
+ Consumer<Options> config, boolean recoverFromState, boolean
shouldMarkDone)
throws Exception {
Identifier identifier = identifier("T");
+ Options options = new Options();
+ options.set(PARTITION_MARK_DONE_WHEN_END_INPUT, true);
+ options.set(PARTITION_MARK_DONE_ACTION, "success-file");
+ config.accept(options);
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
@@ -61,11 +75,7 @@ class PartitionMarkDoneTest extends TableTestBase {
.column("c", DataTypes.INT())
.partitionKeys("a")
.primaryKey("a", "b")
- .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(),
"true")
- .option(PARTITION_MARK_DONE_ACTION.key(),
"success-file")
- .option(
- DELETION_VECTORS_ENABLED.key(),
- Boolean.valueOf(deletionVectors).toString())
+ .options(options.toMap())
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
@@ -80,17 +90,17 @@ class PartitionMarkDoneTest extends TableTestBase {
table)
.get();
- if (!partitionMarkDoneRecoverFromState) {
- notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+ if (!recoverFromState) {
+ notifyCommits(markDone, false, false);
assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
return;
}
- notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
-
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
+ notifyCommits(markDone, true, true);
+
assertThat(table.fileIO().exists(successFile)).isEqualTo(!shouldMarkDone);
- if (!deletionVectors) {
- notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+ if (shouldMarkDone) {
+ notifyCommits(markDone, false, true);
assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
}
}