This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fe0df1159 [flink] Fix deletion vector support for postpone bucket 
tables (#5576)
5fe0df1159 is described below

commit 5fe0df11598b934e1fac45749adebc6509931385
Author: tsreaper <[email protected]>
AuthorDate: Thu May 8 18:00:14 2025 +0800

    [flink] Fix deletion vector support for postpone bucket tables (#5576)
---
 .../apache/paimon/flink/action/CompactAction.java  |  8 +-
 .../postpone/PostponeBucketCompactSplitSource.java | 87 ++++++++++++----------
 .../RewritePostponeBucketCommittableOperator.java  | 44 ++++++-----
 .../flink/source/operator/MonitorSource.java       |  4 +-
 .../paimon/flink/source/operator/ReadOperator.java | 13 ++--
 .../paimon/flink/PostponeBucketTableITCase.java    | 45 +++++++++++
 .../flink/source/operator/OperatorSourceTest.java  |  4 +-
 7 files changed, 132 insertions(+), 73 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 0244a20547..f55244fd9c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -284,12 +284,8 @@ public class CompactAction extends TableActionBase {
             Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
                     PostponeBucketCompactSplitSource.buildSource(
                             env,
-                            realTable.fullName() + partitionSpec,
-                            realTable.rowType(),
-                            realTable
-                                    .newReadBuilder()
-                                    .withPartitionFilter(partitionSpec)
-                                    .withBucket(BucketMode.POSTPONE_BUCKET),
+                            realTable,
+                            partitionSpec,
                             
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
             DataStream<InternalRow> partitioned =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 2bb78eaf78..1cab64c2ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -28,13 +28,11 @@ import org.apache.paimon.flink.source.SimpleSourceSplit;
 import org.apache.paimon.flink.source.operator.ReadOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
@@ -58,6 +56,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -71,10 +70,13 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
     private static final Logger LOG =
             LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
 
-    private final ReadBuilder readBuilder;
+    private final FileStoreTable table;
+    private final Map<String, String> partitionSpec;
 
-    public PostponeBucketCompactSplitSource(ReadBuilder readBuilder) {
-        this.readBuilder = readBuilder;
+    public PostponeBucketCompactSplitSource(
+            FileStoreTable table, Map<String, String> partitionSpec) {
+        this.table = table;
+        this.partitionSpec = partitionSpec;
     }
 
     @Override
@@ -90,50 +92,50 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
 
     private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
 
-        private final TableScan scan = readBuilder.newScan();
-
         @Override
         public InputStatus pollNext(ReaderOutput<Split> output) throws 
Exception {
-            try {
-                List<Split> splits = scan.plan().splits();
-
-                for (Split split : splits) {
-                    DataSplit dataSplit = (DataSplit) split;
-                    List<DataFileMeta> files = new 
ArrayList<>(dataSplit.dataFiles());
-                    // we must replay the written records in exact order
-                    
files.sort(Comparator.comparing(DataFileMeta::creationTime));
-                    for (DataFileMeta meta : files) {
-                        DataSplit s =
-                                DataSplit.builder()
-                                        .withPartition(dataSplit.partition())
-                                        .withBucket(dataSplit.bucket())
-                                        .withBucketPath(dataSplit.bucketPath())
-                                        
.withTotalBuckets(dataSplit.totalBuckets())
-                                        
.withDataFiles(Collections.singletonList(meta))
-                                        .isStreaming(false)
-                                        .build();
-                        output.collect(s);
-                    }
+            List<Split> splits =
+                    table.newSnapshotReader()
+                            .withPartitionFilter(partitionSpec)
+                            .withBucket(BucketMode.POSTPONE_BUCKET)
+                            .read()
+                            .splits();
+
+            for (Split split : splits) {
+                DataSplit dataSplit = (DataSplit) split;
+                List<DataFileMeta> files = new 
ArrayList<>(dataSplit.dataFiles());
+                // we must replay the written records in exact order
+                files.sort(Comparator.comparing(DataFileMeta::creationTime));
+                for (DataFileMeta meta : files) {
+                    DataSplit s =
+                            DataSplit.builder()
+                                    .withPartition(dataSplit.partition())
+                                    .withBucket(dataSplit.bucket())
+                                    .withBucketPath(dataSplit.bucketPath())
+                                    .withTotalBuckets(dataSplit.totalBuckets())
+                                    
.withDataFiles(Collections.singletonList(meta))
+                                    .isStreaming(false)
+                                    .build();
+                    output.collect(s);
                 }
-            } catch (EndOfScanException esf) {
-                LOG.info("Catching EndOfStreamException, the stream is 
finished.");
-                return InputStatus.END_OF_INPUT;
             }
-            return InputStatus.MORE_AVAILABLE;
+
+            return InputStatus.END_OF_INPUT;
         }
     }
 
     public static Pair<DataStream<RowData>, DataStream<Committable>> 
buildSource(
             StreamExecutionEnvironment env,
-            String name,
-            RowType rowType,
-            ReadBuilder readBuilder,
+            FileStoreTable table,
+            Map<String, String> partitionSpec,
             @Nullable Integer parallelism) {
         DataStream<Split> source =
                 env.fromSource(
-                                new 
PostponeBucketCompactSplitSource(readBuilder),
+                                new PostponeBucketCompactSplitSource(table, 
partitionSpec),
                                 WatermarkStrategy.noWatermarks(),
-                                "Compact split generator: " + name,
+                                String.format(
+                                        "Compact split generator: %s - %s",
+                                        table.fullName(), partitionSpec),
                                 new JavaTypeInfo<>(Split.class))
                         .forceNonParallel();
 
@@ -148,9 +150,12 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
         return Pair.of(
                 new DataStream<>(source.getExecutionEnvironment(), partitioned)
                         .transform(
-                                "Compact split reader: " + name,
-                                
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(rowType)),
-                                new ReadOperator(readBuilder, null)),
+                                String.format(
+                                        "Compact split reader: %s - %s",
+                                        table.fullName(), partitionSpec),
+                                InternalTypeInfo.of(
+                                        
LogicalTypeConversion.toLogicalType(table.rowType())),
+                                new ReadOperator(table::newRead, null)),
                 source.forward()
                         .transform(
                                 "Remove new files",
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index 8cb2540aa5..9c9c528f54 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -23,10 +23,12 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -53,7 +55,7 @@ public class RewritePostponeBucketCommittableOperator
     private final FileStoreTable table;
 
     private transient FileStorePathFactory pathFactory;
-    private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles;
+    private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
 
     public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
         this.table = table;
@@ -62,7 +64,7 @@ public class RewritePostponeBucketCommittableOperator
     @Override
     public void open() throws Exception {
         pathFactory = table.store().pathFactory();
-        bucketFiles = new HashMap<>();
+        buckets = new HashMap<>();
     }
 
     @Override
@@ -73,8 +75,7 @@ public class RewritePostponeBucketCommittableOperator
         }
 
         CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
-        bucketFiles
-                .computeIfAbsent(message.partition(), p -> new HashMap<>())
+        buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
                 .computeIfAbsent(
                         message.bucket(),
                         b ->
@@ -91,24 +92,20 @@ public class RewritePostponeBucketCommittableOperator
     }
 
     protected void emitAll(long checkpointId) {
-        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
-                bucketFiles.entrySet()) {
+        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : 
buckets.entrySet()) {
             for (Map.Entry<Integer, BucketFiles> bucketEntry :
                     partitionEntry.getValue().entrySet()) {
                 BucketFiles bucketFiles = bucketEntry.getValue();
-                CommitMessageImpl message =
-                        new CommitMessageImpl(
-                                partitionEntry.getKey(),
-                                bucketEntry.getKey(),
-                                bucketFiles.totalBuckets,
-                                DataIncrement.emptyIncrement(),
-                                bucketFiles.makeIncrement());
                 output.collect(
                         new StreamRecord<>(
-                                new Committable(checkpointId, 
Committable.Kind.FILE, message)));
+                                new Committable(
+                                        checkpointId,
+                                        Committable.Kind.FILE,
+                                        bucketFiles.makeMessage(
+                                                partitionEntry.getKey(), 
bucketEntry.getKey()))));
             }
         }
-        bucketFiles.clear();
+        buckets.clear();
     }
 
     private static class BucketFiles {
@@ -121,6 +118,8 @@ public class RewritePostponeBucketCommittableOperator
         private final List<DataFileMeta> compactBefore;
         private final List<DataFileMeta> compactAfter;
         private final List<DataFileMeta> changelogFiles;
+        private final List<IndexFileMeta> newIndexFiles;
+        private final List<IndexFileMeta> deletedIndexFiles;
 
         private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
             this.pathFactory = pathFactory;
@@ -130,6 +129,8 @@ public class RewritePostponeBucketCommittableOperator
             this.compactBefore = new ArrayList<>();
             this.compactAfter = new ArrayList<>();
             this.changelogFiles = new ArrayList<>();
+            this.newIndexFiles = new ArrayList<>();
+            this.deletedIndexFiles = new ArrayList<>();
         }
 
         private void update(CommitMessageImpl message) {
@@ -157,13 +158,22 @@ public class RewritePostponeBucketCommittableOperator
             
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
             changelogFiles.addAll(message.compactIncrement().changelogFiles());
 
+            newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
+            
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
+
             toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
         }
 
-        private CompactIncrement makeIncrement() {
+        private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) 
{
             List<DataFileMeta> realCompactAfter = new 
ArrayList<>(newFiles.values());
             realCompactAfter.addAll(compactAfter);
-            return new CompactIncrement(compactBefore, realCompactAfter, 
changelogFiles);
+            return new CommitMessageImpl(
+                    partition,
+                    bucket,
+                    totalBuckets,
+                    DataIncrement.emptyIncrement(),
+                    new CompactIncrement(compactBefore, realCompactAfter, 
changelogFiles),
+                    new IndexIncrement(newIndexFiles, deletedIndexFiles));
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index 2783b0ae01..84d6cf7dfb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -229,7 +229,9 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
                                 singleOutputStreamOperator, 
shuffleBucketWithPartition);
 
         return sourceDataStream.transform(
-                name + "-Reader", typeInfo, new ReadOperator(readBuilder, 
nestedProjectedRowData));
+                name + "-Reader",
+                typeInfo,
+                new ReadOperator(readBuilder::newRead, 
nestedProjectedRowData));
     }
 
     private static DataStream<Split> shuffleUnwareBucket(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index c7189f811d..685b324088 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -24,10 +24,10 @@ import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.SerializableSupplier;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -47,9 +47,9 @@ import javax.annotation.Nullable;
 public class ReadOperator extends AbstractStreamOperator<RowData>
         implements OneInputStreamOperator<Split, RowData> {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    private final ReadBuilder readBuilder;
+    private final SerializableSupplier<TableRead> readSupplier;
     @Nullable private final NestedProjectedRowData nestedProjectedRowData;
 
     private transient TableRead read;
@@ -66,8 +66,9 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient Counter numRecordsIn;
 
     public ReadOperator(
-            ReadBuilder readBuilder, @Nullable NestedProjectedRowData 
nestedProjectedRowData) {
-        this.readBuilder = readBuilder;
+            SerializableSupplier<TableRead> readSupplier,
+            @Nullable NestedProjectedRowData nestedProjectedRowData) {
+        this.readSupplier = readSupplier;
         this.nestedProjectedRowData = nestedProjectedRowData;
     }
 
@@ -89,7 +90,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                                 .getEnvironment()
                                 .getIOManager()
                                 .getSpillingDirectoriesPaths());
-        this.read = readBuilder.newRead().withIOManager(ioManager);
+        this.read = readSupplier.get().withIOManager(ioManager);
         this.reuseRow = new FlinkRowData(null);
         this.reuseRecord = new StreamRecord<>(null);
         this.idlingStarted();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index db87abc7ad..adcd76c469 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -550,6 +550,51 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         it.close();
     }
 
+    @Test
+    public void testDeletionVector() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'deletion-vectors.enabled' = 'true'\n"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4, 
40)").await();
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 
30]", "+I[4, 40]");
+
+        tEnv.executeSql("INSERT INTO T VALUES (1, 11), (5, 51)").await();
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 11]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]", 
"+I[5, 51]");
+
+        tEnv.executeSql("INSERT INTO T VALUES (2, 52), (3, 32)").await();
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 11]", "+I[2, 52]", "+I[3, 32]", "+I[4, 40]", 
"+I[5, 51]");
+    }
+
     private List<String> collect(TableResult result) throws Exception {
         List<String> ret = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index a8c1f9c9e5..0d14b60e0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -184,7 +184,7 @@ public class OperatorSourceTest {
 
     @Test
     public void testReadOperator() throws Exception {
-        ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), 
null);
+        ReadOperator readOperator = new ReadOperator(() -> 
table.newReadBuilder().newRead(), null);
         OneInputStreamOperatorTestHarness<Split, RowData> harness =
                 new OneInputStreamOperatorTestHarness<>(readOperator);
         harness.setup(
@@ -206,7 +206,7 @@ public class OperatorSourceTest {
 
     @Test
     public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
-        ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), 
null);
+        ReadOperator readOperator = new ReadOperator(() -> 
table.newReadBuilder().newRead(), null);
         OneInputStreamOperatorTestHarness<Split, RowData> harness =
                 new OneInputStreamOperatorTestHarness<>(readOperator);
         harness.setup(

Reply via email to