This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c38eede042 [core][flink] Compact action support read high level files
from overwrite upgrade (#6868)
c38eede042 is described below
commit c38eede0424ae089ee379a26f61f404edd8d9551
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 24 15:11:58 2025 +0800
[core][flink] Compact action support read high level files from overwrite
upgrade (#6868)
---
.../mergetree/compact/MergeTreeCompactManager.java | 5 +-
.../paimon/flink/action/CompactActionITCase.java | 94 +++++++++++++-
.../flink/sink/StoreCompactOperatorTest.java | 139 +++++++++++++++++++++
3 files changed, 235 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f0909565ee..f6ea3e137d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -114,7 +115,9 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Override
public void addNewFile(DataFileMeta file) {
- levels.addLevel0File(file);
+ // if overwrite an empty partition, the snapshot will be changed to
APPEND, then its files
+ // might be upgraded to high level, thus we should use #update
+ levels.update(Collections.emptyList(),
Collections.singletonList(file));
MetricUtils.safeCall(this::reportMetrics, LOG);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index fb941a73d5..95314d5f08 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -27,11 +27,14 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
@@ -51,8 +54,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link CompactAction}. */
@@ -208,7 +213,7 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
// assert dedicated compact job will expire snapshots
SnapshotManager snapshotManager = table.snapshotManager();
- CommonTestUtils.waitUtil(
+ waitUtil(
() ->
snapshotManager.latestSnapshotId() - 2
== snapshotManager.earliestSnapshotId(),
@@ -735,6 +740,91 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
.hasMessage("sort compact do not support
'partition_idle_time'.");
}
+ @Test
+ public void testStreamingCompactWithEmptyOverwriteUpgrade() throws
Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.COMPACTION_FORCE_UP_LEVEL_0.key(),
"true");
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+ DataType[] fieldTypes = new DataType[] {DataTypes.INT(),
DataTypes.INT(), DataTypes.INT()};
+ RowType rowType = RowType.of(fieldTypes, new String[] {"k", "v",
"pt"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.singletonList("pt"),
+ Arrays.asList("k", "pt"),
+ Collections.emptyList(),
+ tableOptions);
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ StreamExecutionEnvironment env =
+
streamExecutionEnvironmentBuilder().checkpointIntervalMs(500).build();
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "warehouse=" + warehouse)
+ .withStreamExecutionEnvironment(env)
+ .build();
+ env.executeAsync();
+
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ writeData(rowData(1, 100, 1), rowData(2, 200, 1), rowData(1, 100, 2));
+
+ waitUtil(
+ () -> {
+ Snapshot latest = snapshotManager.latestSnapshot();
+ return latest != null && latest.commitKind() ==
Snapshot.CommitKind.COMPACT;
+ },
+ Duration.ofSeconds(10),
+ Duration.ofMillis(100));
+
+ long snapshotId1 = snapshotManager.latestSnapshotId();
+
+ // overwrite empty partition and let it upgrade
+ String newCommitUser = UUID.randomUUID().toString();
+ try (TableWriteImpl<?> newWrite = table.newWrite(newCommitUser);
+ TableCommitImpl newCommit =
+ table.newCommit(newCommitUser)
+ .withOverwrite(Collections.singletonMap("pt",
"3"))) {
+ newWrite.write(rowData(1, 100, 3));
+ newWrite.write(rowData(2, 200, 3));
+ newCommit.commit(newWrite.prepareCommit(false, 1));
+ }
+ // write level 0 file to trigger compaction
+ writeData(rowData(1, 101, 3));
+
+ waitUtil(
+ () -> {
+ Snapshot latest = snapshotManager.latestSnapshot();
+ return latest.id() > snapshotId1
+ && latest.commitKind() ==
Snapshot.CommitKind.COMPACT;
+ },
+ Duration.ofSeconds(10),
+ Duration.ofMillis(100));
+
+ validateResult(
+ table,
+ rowType,
+ table.newStreamScan(),
+ Arrays.asList(
+ "+I[1, 100, 1]",
+ "+I[1, 100, 2]",
+ "+I[1, 101, 3]",
+ "+I[2, 200, 1]",
+ "+I[2, 200, 3]"),
+ 60_000);
+ }
+
private void runAction(boolean isStreaming) throws Exception {
runAction(isStreaming, false, Collections.emptyList());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index f90674c668..08ffc85b14 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -18,29 +18,49 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.PlanImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.system.CompactBucketsTable;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link StoreCompactOperator}. */
public class StoreCompactOperatorTest extends TableTestBase {
@@ -86,6 +106,91 @@ public class StoreCompactOperatorTest extends TableTestBase
{
assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
}
+ @Test
+ public void testStreamingCompactConflictWithOverwrite() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pt", "a")
+ .option("bucket", "1")
+ .build();
+ Identifier identifier = identifier();
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ String writeJobCommitUser = UUID.randomUUID().toString();
+ String compactJobCommitUser = UUID.randomUUID().toString();
+
+ CompactBucketsTable compactBucketsTable = new
CompactBucketsTable(table, true);
+ StreamDataTableScan scan = compactBucketsTable.newStreamScan();
+ InnerTableRead read = compactBucketsTable.newRead();
+
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ checkpointConfig.setCheckpointInterval(500);
+ StoreCompactOperator.Factory operatorFactory =
+ new StoreCompactOperator.Factory(
+ table,
+ StoreSinkWrite.createWriteProvider(
+ table, checkpointConfig, true, false, false),
+ compactJobCommitUser,
+ true);
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operatorFactory);
+ harness.setup(serializer);
+ harness.initializeEmptyState();
+ harness.open();
+ StoreCompactOperator operator = (StoreCompactOperator)
harness.getOperator();
+
+ FileStoreTable writeOnlyTable =
table.copy(Collections.singletonMap("write-only", "true"));
+
+ // write base data
+ batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null,
GenericRow.of(1, 1, 100));
+ read.createReader(scan.plan())
+ .forEachRemaining(
+ row -> {
+ try {
+ harness.processElement(new StreamRecord<>(new
FlinkRowData(row)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<Committable> committables1 = operator.prepareCommit(true, 1);
+ commit(table, compactJobCommitUser, committables1, 1);
+ assertThat(table.snapshotManager().latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+
+ // overwrite and insert
+ batchWriteAndCommit(
+ writeOnlyTable,
+ writeJobCommitUser,
+ Collections.singletonMap("pt", "1"),
+ GenericRow.of(1, 2, 200));
+ batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null,
GenericRow.of(1, 3, 300));
+ assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(4);
+ TableScan.Plan plan = scan.plan();
+ assertThat(((PlanImpl) plan).snapshotId()).isEqualTo(4);
+ read.createReader(plan)
+ .forEachRemaining(
+ row -> {
+ try {
+ harness.processElement(new StreamRecord<>(new
FlinkRowData(row)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<Committable> committables2 = operator.prepareCommit(true, 2);
+ assertThatThrownBy(() -> commit(table, compactJobCommitUser,
committables2, 2))
+ .hasMessageContaining("File deletion conflicts detected! Give
up committing.");
+ }
+
private RowData data(int bucket) {
GenericRow genericRow =
GenericRow.of(
@@ -96,6 +201,40 @@ public class StoreCompactOperatorTest extends TableTestBase
{
return new FlinkRowData(genericRow);
}
+ private void batchWriteAndCommit(
+ FileStoreTable table,
+ String commitUser,
+ @Nullable Map<String, String> overwritePartition,
+ InternalRow... rows)
+ throws Exception {
+ try (TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit =
+
table.newCommit(commitUser).withOverwrite(overwritePartition)) {
+ for (InternalRow row : rows) {
+ write.write(row);
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private void commit(
+ FileStoreTable table,
+ String commitUser,
+ List<Committable> committables,
+ long checkpointId)
+ throws Exception {
+ try (TableCommitImpl commit = table.newCommit(commitUser)) {
+ StoreCommitter committer =
+ new StoreCommitter(
+ table,
+ commit,
+ Committer.createContext(commitUser, null, true,
false, null, 1, 1));
+ ManifestCommittable manifestCommittable =
+ committer.combine(checkpointId,
System.currentTimeMillis(), committables);
+ committer.commit(Collections.singletonList(manifestCommittable));
+ }
+ }
+
private static class CompactRememberStoreWrite implements StoreSinkWrite {
private final boolean streamingMode;