This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c38eede042 [core][flink] Compact action support read high level files 
from overwrite upgrade (#6868)
c38eede042 is described below

commit c38eede0424ae089ee379a26f61f404edd8d9551
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 24 15:11:58 2025 +0800

    [core][flink] Compact action support read high level files from overwrite 
upgrade (#6868)
---
 .../mergetree/compact/MergeTreeCompactManager.java |   5 +-
 .../paimon/flink/action/CompactActionITCase.java   |  94 +++++++++++++-
 .../flink/sink/StoreCompactOperatorTest.java       | 139 +++++++++++++++++++++
 3 files changed, 235 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f0909565ee..f6ea3e137d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -114,7 +115,9 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
 
     @Override
     public void addNewFile(DataFileMeta file) {
-        levels.addLevel0File(file);
+        // if overwrite an empty partition, the snapshot will be changed to 
APPEND, then its files
+        // might be upgraded to high level, thus we should use #update
+        levels.update(Collections.emptyList(), 
Collections.singletonList(file));
         MetricUtils.safeCall(this::reportMetrics, LOG);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index fb941a73d5..95314d5f08 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -27,11 +27,14 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -51,8 +54,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link CompactAction}. */
@@ -208,7 +213,7 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
 
         // assert dedicated compact job will expire snapshots
         SnapshotManager snapshotManager = table.snapshotManager();
-        CommonTestUtils.waitUtil(
+        waitUtil(
                 () ->
                         snapshotManager.latestSnapshotId() - 2
                                 == snapshotManager.earliestSnapshotId(),
@@ -735,6 +740,91 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 .hasMessage("sort compact do not support 
'partition_idle_time'.");
     }
 
+    @Test
+    public void testStreamingCompactWithEmptyOverwriteUpgrade() throws 
Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.COMPACTION_FORCE_UP_LEVEL_0.key(), 
"true");
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        DataType[] fieldTypes = new DataType[] {DataTypes.INT(), 
DataTypes.INT(), DataTypes.INT()};
+        RowType rowType = RowType.of(fieldTypes, new String[] {"k", "v", 
"pt"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.singletonList("pt"),
+                        Arrays.asList("k", "pt"),
+                        Collections.emptyList(),
+                        tableOptions);
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        StreamExecutionEnvironment env =
+                
streamExecutionEnvironmentBuilder().checkpointIntervalMs(500).build();
+        createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--catalog_conf",
+                        "warehouse=" + warehouse)
+                .withStreamExecutionEnvironment(env)
+                .build();
+        env.executeAsync();
+
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        writeData(rowData(1, 100, 1), rowData(2, 200, 1), rowData(1, 100, 2));
+
+        waitUtil(
+                () -> {
+                    Snapshot latest = snapshotManager.latestSnapshot();
+                    return latest != null && latest.commitKind() == 
Snapshot.CommitKind.COMPACT;
+                },
+                Duration.ofSeconds(10),
+                Duration.ofMillis(100));
+
+        long snapshotId1 = snapshotManager.latestSnapshotId();
+
+        // overwrite empty partition and let it upgrade
+        String newCommitUser = UUID.randomUUID().toString();
+        try (TableWriteImpl<?> newWrite = table.newWrite(newCommitUser);
+                TableCommitImpl newCommit =
+                        table.newCommit(newCommitUser)
+                                .withOverwrite(Collections.singletonMap("pt", 
"3"))) {
+            newWrite.write(rowData(1, 100, 3));
+            newWrite.write(rowData(2, 200, 3));
+            newCommit.commit(newWrite.prepareCommit(false, 1));
+        }
+        // write level 0 file to trigger compaction
+        writeData(rowData(1, 101, 3));
+
+        waitUtil(
+                () -> {
+                    Snapshot latest = snapshotManager.latestSnapshot();
+                    return latest.id() > snapshotId1
+                            && latest.commitKind() == 
Snapshot.CommitKind.COMPACT;
+                },
+                Duration.ofSeconds(10),
+                Duration.ofMillis(100));
+
+        validateResult(
+                table,
+                rowType,
+                table.newStreamScan(),
+                Arrays.asList(
+                        "+I[1, 100, 1]",
+                        "+I[1, 100, 2]",
+                        "+I[1, 101, 3]",
+                        "+I[2, 200, 1]",
+                        "+I[2, 200, 3]"),
+                60_000);
+    }
+
     private void runAction(boolean isStreaming) throws Exception {
         runAction(isStreaming, false, Collections.emptyList());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index f90674c668..08ffc85b14 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -18,29 +18,49 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.PlanImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.system.CompactBucketsTable;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import javax.annotation.Nullable;
+
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link StoreCompactOperator}. */
 public class StoreCompactOperatorTest extends TableTestBase {
@@ -86,6 +106,91 @@ public class StoreCompactOperatorTest extends TableTestBase 
{
         assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
     }
 
+    @Test
+    public void testStreamingCompactConflictWithOverwrite() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pt", "a")
+                        .option("bucket", "1")
+                        .build();
+        Identifier identifier = identifier();
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        String writeJobCommitUser = UUID.randomUUID().toString();
+        String compactJobCommitUser = UUID.randomUUID().toString();
+
+        CompactBucketsTable compactBucketsTable = new 
CompactBucketsTable(table, true);
+        StreamDataTableScan scan = compactBucketsTable.newStreamScan();
+        InnerTableRead read = compactBucketsTable.newRead();
+
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        checkpointConfig.setCheckpointInterval(500);
+        StoreCompactOperator.Factory operatorFactory =
+                new StoreCompactOperator.Factory(
+                        table,
+                        StoreSinkWrite.createWriteProvider(
+                                table, checkpointConfig, true, false, false),
+                        compactJobCommitUser,
+                        true);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operatorFactory);
+        harness.setup(serializer);
+        harness.initializeEmptyState();
+        harness.open();
+        StoreCompactOperator operator = (StoreCompactOperator) 
harness.getOperator();
+
+        FileStoreTable writeOnlyTable = 
table.copy(Collections.singletonMap("write-only", "true"));
+
+        // write base data
+        batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null, 
GenericRow.of(1, 1, 100));
+        read.createReader(scan.plan())
+                .forEachRemaining(
+                        row -> {
+                            try {
+                                harness.processElement(new StreamRecord<>(new 
FlinkRowData(row)));
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        List<Committable> committables1 = operator.prepareCommit(true, 1);
+        commit(table, compactJobCommitUser, committables1, 1);
+        assertThat(table.snapshotManager().latestSnapshot().commitKind())
+                .isEqualTo(Snapshot.CommitKind.COMPACT);
+
+        // overwrite and insert
+        batchWriteAndCommit(
+                writeOnlyTable,
+                writeJobCommitUser,
+                Collections.singletonMap("pt", "1"),
+                GenericRow.of(1, 2, 200));
+        batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null, 
GenericRow.of(1, 3, 300));
+        assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(4);
+        TableScan.Plan plan = scan.plan();
+        assertThat(((PlanImpl) plan).snapshotId()).isEqualTo(4);
+        read.createReader(plan)
+                .forEachRemaining(
+                        row -> {
+                            try {
+                                harness.processElement(new StreamRecord<>(new 
FlinkRowData(row)));
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        List<Committable> committables2 = operator.prepareCommit(true, 2);
+        assertThatThrownBy(() -> commit(table, compactJobCommitUser, 
committables2, 2))
+                .hasMessageContaining("File deletion conflicts detected! Give 
up committing.");
+    }
+
     private RowData data(int bucket) {
         GenericRow genericRow =
                 GenericRow.of(
@@ -96,6 +201,40 @@ public class StoreCompactOperatorTest extends TableTestBase 
{
         return new FlinkRowData(genericRow);
     }
 
+    private void batchWriteAndCommit(
+            FileStoreTable table,
+            String commitUser,
+            @Nullable Map<String, String> overwritePartition,
+            InternalRow... rows)
+            throws Exception {
+        try (TableWriteImpl<?> write = table.newWrite(commitUser);
+                TableCommitImpl commit =
+                        
table.newCommit(commitUser).withOverwrite(overwritePartition)) {
+            for (InternalRow row : rows) {
+                write.write(row);
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private void commit(
+            FileStoreTable table,
+            String commitUser,
+            List<Committable> committables,
+            long checkpointId)
+            throws Exception {
+        try (TableCommitImpl commit = table.newCommit(commitUser)) {
+            StoreCommitter committer =
+                    new StoreCommitter(
+                            table,
+                            commit,
+                            Committer.createContext(commitUser, null, true, 
false, null, 1, 1));
+            ManifestCommittable manifestCommittable =
+                    committer.combine(checkpointId, 
System.currentTimeMillis(), committables);
+            committer.commit(Collections.singletonList(manifestCommittable));
+        }
+    }
+
     private static class CompactRememberStoreWrite implements StoreSinkWrite {
 
         private final boolean streamingMode;

Reply via email to