This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5fe0df1159 [flink] Fix deletion vector support for postpone bucket
tables (#5576)
5fe0df1159 is described below
commit 5fe0df11598b934e1fac45749adebc6509931385
Author: tsreaper <[email protected]>
AuthorDate: Thu May 8 18:00:14 2025 +0800
[flink] Fix deletion vector support for postpone bucket tables (#5576)
---
.../apache/paimon/flink/action/CompactAction.java | 8 +-
.../postpone/PostponeBucketCompactSplitSource.java | 87 ++++++++++++----------
.../RewritePostponeBucketCommittableOperator.java | 44 ++++++-----
.../flink/source/operator/MonitorSource.java | 4 +-
.../paimon/flink/source/operator/ReadOperator.java | 13 ++--
.../paimon/flink/PostponeBucketTableITCase.java | 45 +++++++++++
.../flink/source/operator/OperatorSourceTest.java | 4 +-
7 files changed, 132 insertions(+), 73 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 0244a20547..f55244fd9c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -284,12 +284,8 @@ public class CompactAction extends TableActionBase {
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
PostponeBucketCompactSplitSource.buildSource(
env,
- realTable.fullName() + partitionSpec,
- realTable.rowType(),
- realTable
- .newReadBuilder()
- .withPartitionFilter(partitionSpec)
- .withBucket(BucketMode.POSTPONE_BUCKET),
+ realTable,
+ partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
DataStream<InternalRow> partitioned =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 2bb78eaf78..1cab64c2ca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -28,13 +28,11 @@ import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
@@ -58,6 +56,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -71,10 +70,13 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
private static final Logger LOG =
LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
- private final ReadBuilder readBuilder;
+ private final FileStoreTable table;
+ private final Map<String, String> partitionSpec;
- public PostponeBucketCompactSplitSource(ReadBuilder readBuilder) {
- this.readBuilder = readBuilder;
+ public PostponeBucketCompactSplitSource(
+ FileStoreTable table, Map<String, String> partitionSpec) {
+ this.table = table;
+ this.partitionSpec = partitionSpec;
}
@Override
@@ -90,50 +92,50 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
- private final TableScan scan = readBuilder.newScan();
-
@Override
public InputStatus pollNext(ReaderOutput<Split> output) throws
Exception {
- try {
- List<Split> splits = scan.plan().splits();
-
- for (Split split : splits) {
- DataSplit dataSplit = (DataSplit) split;
- List<DataFileMeta> files = new
ArrayList<>(dataSplit.dataFiles());
- // we must replay the written records in exact order
-
files.sort(Comparator.comparing(DataFileMeta::creationTime));
- for (DataFileMeta meta : files) {
- DataSplit s =
- DataSplit.builder()
- .withPartition(dataSplit.partition())
- .withBucket(dataSplit.bucket())
- .withBucketPath(dataSplit.bucketPath())
-
.withTotalBuckets(dataSplit.totalBuckets())
-
.withDataFiles(Collections.singletonList(meta))
- .isStreaming(false)
- .build();
- output.collect(s);
- }
+ List<Split> splits =
+ table.newSnapshotReader()
+ .withPartitionFilter(partitionSpec)
+ .withBucket(BucketMode.POSTPONE_BUCKET)
+ .read()
+ .splits();
+
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ List<DataFileMeta> files = new
ArrayList<>(dataSplit.dataFiles());
+ // we must replay the written records in exact order
+ files.sort(Comparator.comparing(DataFileMeta::creationTime));
+ for (DataFileMeta meta : files) {
+ DataSplit s =
+ DataSplit.builder()
+ .withPartition(dataSplit.partition())
+ .withBucket(dataSplit.bucket())
+ .withBucketPath(dataSplit.bucketPath())
+ .withTotalBuckets(dataSplit.totalBuckets())
+
.withDataFiles(Collections.singletonList(meta))
+ .isStreaming(false)
+ .build();
+ output.collect(s);
}
- } catch (EndOfScanException esf) {
- LOG.info("Catching EndOfStreamException, the stream is
finished.");
- return InputStatus.END_OF_INPUT;
}
- return InputStatus.MORE_AVAILABLE;
+
+ return InputStatus.END_OF_INPUT;
}
}
public static Pair<DataStream<RowData>, DataStream<Committable>>
buildSource(
StreamExecutionEnvironment env,
- String name,
- RowType rowType,
- ReadBuilder readBuilder,
+ FileStoreTable table,
+ Map<String, String> partitionSpec,
@Nullable Integer parallelism) {
DataStream<Split> source =
env.fromSource(
- new
PostponeBucketCompactSplitSource(readBuilder),
+ new PostponeBucketCompactSplitSource(table,
partitionSpec),
WatermarkStrategy.noWatermarks(),
- "Compact split generator: " + name,
+ String.format(
+ "Compact split generator: %s - %s",
+ table.fullName(), partitionSpec),
new JavaTypeInfo<>(Split.class))
.forceNonParallel();
@@ -148,9 +150,12 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
return Pair.of(
new DataStream<>(source.getExecutionEnvironment(), partitioned)
.transform(
- "Compact split reader: " + name,
-
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(rowType)),
- new ReadOperator(readBuilder, null)),
+ String.format(
+ "Compact split reader: %s - %s",
+ table.fullName(), partitionSpec),
+ InternalTypeInfo.of(
+
LogicalTypeConversion.toLogicalType(table.rowType())),
+ new ReadOperator(table::newRead, null)),
source.forward()
.transform(
"Remove new files",
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index 8cb2540aa5..9c9c528f54 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -23,10 +23,12 @@ import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -53,7 +55,7 @@ public class RewritePostponeBucketCommittableOperator
private final FileStoreTable table;
private transient FileStorePathFactory pathFactory;
- private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles;
+ private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
this.table = table;
@@ -62,7 +64,7 @@ public class RewritePostponeBucketCommittableOperator
@Override
public void open() throws Exception {
pathFactory = table.store().pathFactory();
- bucketFiles = new HashMap<>();
+ buckets = new HashMap<>();
}
@Override
@@ -73,8 +75,7 @@ public class RewritePostponeBucketCommittableOperator
}
CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
- bucketFiles
- .computeIfAbsent(message.partition(), p -> new HashMap<>())
+ buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
.computeIfAbsent(
message.bucket(),
b ->
@@ -91,24 +92,20 @@ public class RewritePostponeBucketCommittableOperator
}
protected void emitAll(long checkpointId) {
- for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
- bucketFiles.entrySet()) {
+ for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
buckets.entrySet()) {
for (Map.Entry<Integer, BucketFiles> bucketEntry :
partitionEntry.getValue().entrySet()) {
BucketFiles bucketFiles = bucketEntry.getValue();
- CommitMessageImpl message =
- new CommitMessageImpl(
- partitionEntry.getKey(),
- bucketEntry.getKey(),
- bucketFiles.totalBuckets,
- DataIncrement.emptyIncrement(),
- bucketFiles.makeIncrement());
output.collect(
new StreamRecord<>(
- new Committable(checkpointId,
Committable.Kind.FILE, message)));
+ new Committable(
+ checkpointId,
+ Committable.Kind.FILE,
+ bucketFiles.makeMessage(
+ partitionEntry.getKey(),
bucketEntry.getKey()))));
}
}
- bucketFiles.clear();
+ buckets.clear();
}
private static class BucketFiles {
@@ -121,6 +118,8 @@ public class RewritePostponeBucketCommittableOperator
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final List<DataFileMeta> changelogFiles;
+ private final List<IndexFileMeta> newIndexFiles;
+ private final List<IndexFileMeta> deletedIndexFiles;
private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
this.pathFactory = pathFactory;
@@ -130,6 +129,8 @@ public class RewritePostponeBucketCommittableOperator
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.changelogFiles = new ArrayList<>();
+ this.newIndexFiles = new ArrayList<>();
+ this.deletedIndexFiles = new ArrayList<>();
}
private void update(CommitMessageImpl message) {
@@ -157,13 +158,22 @@ public class RewritePostponeBucketCommittableOperator
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
changelogFiles.addAll(message.compactIncrement().changelogFiles());
+ newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
+
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
+
toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
}
- private CompactIncrement makeIncrement() {
+ private CommitMessageImpl makeMessage(BinaryRow partition, int bucket)
{
List<DataFileMeta> realCompactAfter = new
ArrayList<>(newFiles.values());
realCompactAfter.addAll(compactAfter);
- return new CompactIncrement(compactBefore, realCompactAfter,
changelogFiles);
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ totalBuckets,
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(compactBefore, realCompactAfter,
changelogFiles),
+ new IndexIncrement(newIndexFiles, deletedIndexFiles));
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index 2783b0ae01..84d6cf7dfb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -229,7 +229,9 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
singleOutputStreamOperator,
shuffleBucketWithPartition);
return sourceDataStream.transform(
- name + "-Reader", typeInfo, new ReadOperator(readBuilder,
nestedProjectedRowData));
+ name + "-Reader",
+ typeInfo,
+ new ReadOperator(readBuilder::newRead,
nestedProjectedRowData));
}
private static DataStream<Split> shuffleUnwareBucket(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index c7189f811d..685b324088 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -24,10 +24,10 @@ import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.SerializableSupplier;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.MetricNames;
@@ -47,9 +47,9 @@ import javax.annotation.Nullable;
public class ReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Split, RowData> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private final ReadBuilder readBuilder;
+ private final SerializableSupplier<TableRead> readSupplier;
@Nullable private final NestedProjectedRowData nestedProjectedRowData;
private transient TableRead read;
@@ -66,8 +66,9 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
private transient Counter numRecordsIn;
public ReadOperator(
- ReadBuilder readBuilder, @Nullable NestedProjectedRowData
nestedProjectedRowData) {
- this.readBuilder = readBuilder;
+ SerializableSupplier<TableRead> readSupplier,
+ @Nullable NestedProjectedRowData nestedProjectedRowData) {
+ this.readSupplier = readSupplier;
this.nestedProjectedRowData = nestedProjectedRowData;
}
@@ -89,7 +90,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
.getEnvironment()
.getIOManager()
.getSpillingDirectoriesPaths());
- this.read = readBuilder.newRead().withIOManager(ioManager);
+ this.read = readSupplier.get().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(null);
this.idlingStarted();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index db87abc7ad..adcd76c469 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -550,6 +550,51 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
it.close();
}
+ @Test
+ public void testDeletionVector() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'deletion-vectors.enabled' = 'true'\n"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4,
40)").await();
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3,
30]", "+I[4, 40]");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 11), (5, 51)").await();
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, 11]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]",
"+I[5, 51]");
+
+ tEnv.executeSql("INSERT INTO T VALUES (2, 52), (3, 32)").await();
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, 11]", "+I[2, 52]", "+I[3, 32]", "+I[4, 40]",
"+I[5, 51]");
+ }
+
private List<String> collect(TableResult result) throws Exception {
List<String> ret = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index a8c1f9c9e5..0d14b60e0c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -184,7 +184,7 @@ public class OperatorSourceTest {
@Test
public void testReadOperator() throws Exception {
- ReadOperator readOperator = new ReadOperator(table.newReadBuilder(),
null);
+ ReadOperator readOperator = new ReadOperator(() ->
table.newReadBuilder().newRead(), null);
OneInputStreamOperatorTestHarness<Split, RowData> harness =
new OneInputStreamOperatorTestHarness<>(readOperator);
harness.setup(
@@ -206,7 +206,7 @@ public class OperatorSourceTest {
@Test
public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
- ReadOperator readOperator = new ReadOperator(table.newReadBuilder(),
null);
+ ReadOperator readOperator = new ReadOperator(() ->
table.newReadBuilder().newRead(), null);
OneInputStreamOperatorTestHarness<Split, RowData> harness =
new OneInputStreamOperatorTestHarness<>(readOperator);
harness.setup(