This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit dbfa376d2c5f9fd56f588f9b752ced395a13c1e2 Author: tsreaper <[email protected]> AuthorDate: Thu May 8 18:00:14 2025 +0800 [flink] Fix deletion vector support for postpone bucket tables (#5576) --- .../apache/paimon/flink/action/CompactAction.java | 8 +- .../postpone/PostponeBucketCompactSplitSource.java | 87 ++++++++++++---------- .../RewritePostponeBucketCommittableOperator.java | 44 ++++++----- .../flink/source/operator/MonitorSource.java | 4 +- .../paimon/flink/source/operator/ReadOperator.java | 13 ++-- .../paimon/flink/PostponeBucketTableITCase.java | 45 +++++++++++ .../flink/source/operator/OperatorSourceTest.java | 4 +- 7 files changed, 132 insertions(+), 73 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 53e802be76..b23f0219a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -285,12 +285,8 @@ public class CompactAction extends TableActionBase { Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = PostponeBucketCompactSplitSource.buildSource( env, - realTable.fullName() + partitionSpec, - realTable.rowType(), - realTable - .newReadBuilder() - .withPartitionFilter(partitionSpec) - .withBucket(BucketMode.POSTPONE_BUCKET), + realTable, + partitionSpec, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); DataStream<InternalRow> partitioned = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java index 2bb78eaf78..1cab64c2ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java @@ -28,13 +28,11 @@ import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.operator.ReadOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.EndOfScanException; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.TableScan; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -58,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -71,10 +70,13 @@ public class PostponeBucketCompactSplitSource extends AbstractNonCoordinatedSour private static final Logger LOG = LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class); - private final ReadBuilder readBuilder; + private final FileStoreTable table; + private final Map<String, String> partitionSpec; - public PostponeBucketCompactSplitSource(ReadBuilder readBuilder) { - this.readBuilder = readBuilder; + public PostponeBucketCompactSplitSource( + FileStoreTable table, Map<String, String> partitionSpec) { + this.table = table; + this.partitionSpec = partitionSpec; } @Override @@ -90,50 +92,50 @@ public class PostponeBucketCompactSplitSource extends AbstractNonCoordinatedSour private class Reader extends AbstractNonCoordinatedSourceReader<Split> { - private final TableScan scan = readBuilder.newScan(); - @Override public InputStatus pollNext(ReaderOutput<Split> output) throws Exception { - try { - List<Split> splits = scan.plan().splits(); - - for (Split split : splits) { - DataSplit dataSplit = (DataSplit) split; - List<DataFileMeta> files = new ArrayList<>(dataSplit.dataFiles()); - // we must replay the written records in exact order - files.sort(Comparator.comparing(DataFileMeta::creationTime)); - for (DataFileMeta meta : files) { - DataSplit s = - DataSplit.builder() - .withPartition(dataSplit.partition()) - .withBucket(dataSplit.bucket()) - .withBucketPath(dataSplit.bucketPath()) - .withTotalBuckets(dataSplit.totalBuckets()) - .withDataFiles(Collections.singletonList(meta)) - .isStreaming(false) - .build(); - output.collect(s); - } + List<Split> splits = + table.newSnapshotReader() + .withPartitionFilter(partitionSpec) + .withBucket(BucketMode.POSTPONE_BUCKET) + .read() + .splits(); + + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + List<DataFileMeta> files = new ArrayList<>(dataSplit.dataFiles()); + // we must replay the written records in exact order + files.sort(Comparator.comparing(DataFileMeta::creationTime)); + for (DataFileMeta meta : files) { + DataSplit s = + DataSplit.builder() + .withPartition(dataSplit.partition()) + .withBucket(dataSplit.bucket()) + .withBucketPath(dataSplit.bucketPath()) + .withTotalBuckets(dataSplit.totalBuckets()) + .withDataFiles(Collections.singletonList(meta)) + .isStreaming(false) + .build(); + output.collect(s); } - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return InputStatus.END_OF_INPUT; } - return InputStatus.MORE_AVAILABLE; + + return InputStatus.END_OF_INPUT; } } public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource( StreamExecutionEnvironment env, - String name, - RowType rowType, - ReadBuilder readBuilder, + FileStoreTable table, + Map<String, String> partitionSpec, @Nullable Integer parallelism) { DataStream<Split> source = env.fromSource( - new PostponeBucketCompactSplitSource(readBuilder), + new PostponeBucketCompactSplitSource(table, partitionSpec), WatermarkStrategy.noWatermarks(), - "Compact split generator: " + name, + String.format( + "Compact split generator: %s - %s", + table.fullName(), partitionSpec), new JavaTypeInfo<>(Split.class)) .forceNonParallel(); @@ -148,9 +150,12 @@ public class PostponeBucketCompactSplitSource extends AbstractNonCoordinatedSour return Pair.of( new DataStream<>(source.getExecutionEnvironment(), partitioned) .transform( - "Compact split reader: " + name, - InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(rowType)), - new ReadOperator(readBuilder, null)), + String.format( + "Compact split reader: %s - %s", + table.fullName(), partitionSpec), + InternalTypeInfo.of( + LogicalTypeConversion.toLogicalType(table.rowType())), + new ReadOperator(table::newRead, null)), source.forward() .transform( "Remove new files", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java index 8cb2540aa5..9c9c528f54 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java @@ -23,10 +23,12 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.FileStorePathFactory; @@ -53,7 +55,7 @@ public class RewritePostponeBucketCommittableOperator private final FileStoreTable table; private transient FileStorePathFactory pathFactory; - private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles; + private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets; public RewritePostponeBucketCommittableOperator(FileStoreTable table) { this.table = table; @@ -62,7 +64,7 @@ public class RewritePostponeBucketCommittableOperator @Override public void open() throws Exception { pathFactory = table.store().pathFactory(); - bucketFiles = new HashMap<>(); + buckets = new HashMap<>(); } @Override @@ -73,8 +75,7 @@ public class RewritePostponeBucketCommittableOperator } CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); - bucketFiles - .computeIfAbsent(message.partition(), p -> new HashMap<>()) + buckets.computeIfAbsent(message.partition(), p -> new HashMap<>()) .computeIfAbsent( message.bucket(), b -> @@ -91,24 +92,20 @@ public class RewritePostponeBucketCommittableOperator } protected void emitAll(long checkpointId) { - for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : - bucketFiles.entrySet()) { + for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : buckets.entrySet()) { for (Map.Entry<Integer, BucketFiles> bucketEntry : partitionEntry.getValue().entrySet()) { BucketFiles bucketFiles = bucketEntry.getValue(); - CommitMessageImpl message = - new CommitMessageImpl( - partitionEntry.getKey(), - bucketEntry.getKey(), - bucketFiles.totalBuckets, - DataIncrement.emptyIncrement(), - bucketFiles.makeIncrement()); output.collect( new StreamRecord<>( - new Committable(checkpointId, Committable.Kind.FILE, message))); + new Committable( + checkpointId, + Committable.Kind.FILE, + bucketFiles.makeMessage( + partitionEntry.getKey(), bucketEntry.getKey())))); } } - bucketFiles.clear(); + buckets.clear(); } private static class BucketFiles { @@ -121,6 +118,8 @@ public class RewritePostponeBucketCommittableOperator private final List<DataFileMeta> compactBefore; private final List<DataFileMeta> compactAfter; private final List<DataFileMeta> changelogFiles; + private final List<IndexFileMeta> newIndexFiles; + private final List<IndexFileMeta> deletedIndexFiles; private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) { this.pathFactory = pathFactory; @@ -130,6 +129,8 @@ public class RewritePostponeBucketCommittableOperator this.compactBefore = new ArrayList<>(); this.compactAfter = new ArrayList<>(); this.changelogFiles = new ArrayList<>(); + this.newIndexFiles = new ArrayList<>(); + this.deletedIndexFiles = new ArrayList<>(); } private void update(CommitMessageImpl message) { @@ -157,13 +158,22 @@ public class RewritePostponeBucketCommittableOperator changelogFiles.addAll(message.newFilesIncrement().changelogFiles()); changelogFiles.addAll(message.compactIncrement().changelogFiles()); + newIndexFiles.addAll(message.indexIncrement().newIndexFiles()); + deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles()); + toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path)); } - private CompactIncrement makeIncrement() { + private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) { List<DataFileMeta> realCompactAfter = new ArrayList<>(newFiles.values()); realCompactAfter.addAll(compactAfter); - return new CompactIncrement(compactBefore, realCompactAfter, changelogFiles); + return new CommitMessageImpl( + partition, + bucket, + totalBuckets, + DataIncrement.emptyIncrement(), + new CompactIncrement(compactBefore, realCompactAfter, changelogFiles), + new IndexIncrement(newIndexFiles, deletedIndexFiles)); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index 2783b0ae01..84d6cf7dfb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -229,7 +229,9 @@ public class MonitorSource extends AbstractNonCoordinatedSource<Split> { singleOutputStreamOperator, shuffleBucketWithPartition); return sourceDataStream.transform( - name + "-Reader", typeInfo, new ReadOperator(readBuilder, nestedProjectedRowData)); + name + "-Reader", + typeInfo, + new ReadOperator(readBuilder::newRead, nestedProjectedRowData)); } private static DataStream<Split> shuffleUnwareBucket( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index c7189f811d..685b324088 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -24,10 +24,10 @@ import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.SerializableSupplier; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.metrics.MetricNames; @@ -47,9 +47,9 @@ import javax.annotation.Nullable; public class ReadOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<Split, RowData> { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; - private final ReadBuilder readBuilder; + private final SerializableSupplier<TableRead> readSupplier; @Nullable private final NestedProjectedRowData nestedProjectedRowData; private transient TableRead read; @@ -66,8 +66,9 @@ public class ReadOperator extends AbstractStreamOperator<RowData> private transient Counter numRecordsIn; public ReadOperator( - ReadBuilder readBuilder, @Nullable NestedProjectedRowData nestedProjectedRowData) { - this.readBuilder = readBuilder; + SerializableSupplier<TableRead> readSupplier, + @Nullable NestedProjectedRowData nestedProjectedRowData) { + this.readSupplier = readSupplier; this.nestedProjectedRowData = nestedProjectedRowData; } @@ -89,7 +90,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData> .getEnvironment() .getIOManager() .getSpillingDirectoriesPaths()); - this.read = readBuilder.newRead().withIOManager(ioManager); + this.read = readSupplier.get().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(null); this.idlingStarted(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java index db87abc7ad..adcd76c469 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java @@ -550,6 +550,51 @@ public class PostponeBucketTableITCase extends AbstractTestBase { it.close(); } + @Test + public void testDeletionVector() throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment tEnv = + tableEnvironmentBuilder() + .batchMode() + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); + + tEnv.executeSql( + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG mycat"); + tEnv.executeSql( + "CREATE TABLE T (\n" + + " k INT,\n" + + " v INT,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ") WITH (\n" + + " 'bucket' = '-2',\n" + + " 'deletion-vectors.enabled' = 'true'\n" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4, 40)").await(); + tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + assertThat(collect(tEnv.executeSql("SELECT * FROM T"))) + .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 11), (5, 51)").await(); + tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + assertThat(collect(tEnv.executeSql("SELECT * FROM T"))) + .containsExactlyInAnyOrder( + "+I[1, 11]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]", "+I[5, 51]"); + + tEnv.executeSql("INSERT INTO T VALUES (2, 52), (3, 32)").await(); + tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + assertThat(collect(tEnv.executeSql("SELECT * FROM T"))) + .containsExactlyInAnyOrder( + "+I[1, 11]", "+I[2, 52]", "+I[3, 32]", "+I[4, 40]", "+I[5, 51]"); + } + private List<String> collect(TableResult result) throws Exception { List<String> ret = new ArrayList<>(); try (CloseableIterator<Row> it = result.collect()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index a8c1f9c9e5..0d14b60e0c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -184,7 +184,7 @@ public class OperatorSourceTest { @Test public void testReadOperator() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); + ReadOperator readOperator = new ReadOperator(() -> table.newReadBuilder().newRead(), null); OneInputStreamOperatorTestHarness<Split, RowData> harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup( @@ -206,7 +206,7 @@ public class OperatorSourceTest { @Test public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); + ReadOperator readOperator = new ReadOperator(() -> table.newReadBuilder().newRead(), null); OneInputStreamOperatorTestHarness<Split, RowData> harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup(
