This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit a47a28ec38550f319aee467206f0c5e1b4cf028d
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Nov 10 17:46:30 2025 +0800

    [core] support incremental clustering in dv mode (#6559)
---
 .../append/cluster/HistoryPartitionCluster.java    |  10 +-
 .../append/cluster/IncrementalClusterManager.java  |  97 +++++++++++++-
 .../org/apache/paimon/schema/SchemaValidation.java |   3 -
 .../cluster/IncrementalClusterManagerTest.java     |   7 +-
 .../apache/paimon/flink/action/CompactAction.java  |  22 +++-
 .../cluster/IncrementalClusterSplitSource.java     |   6 +-
 .../cluster/RemoveClusterBeforeFilesOperator.java  |  20 ++-
 .../action/IncrementalClusterActionITCase.java     | 142 +++++++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java   |  39 +++++-
 .../spark/procedure/CompactProcedureTestBase.scala | 136 ++++++++++++++++++++
 10 files changed, 464 insertions(+), 18 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
index 83f92aa9c7..8acbb8e778 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.append.cluster;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
@@ -54,6 +55,7 @@ public class HistoryPartitionCluster {
     private static final Logger LOG = 
LoggerFactory.getLogger(HistoryPartitionCluster.class);
 
     private final FileStoreTable table;
+    private final Snapshot snapshot;
     private final IncrementalClusterStrategy incrementalClusterStrategy;
     private final InternalRowPartitionComputer partitionComputer;
     private final PartitionPredicate specifiedPartitions;
@@ -63,12 +65,14 @@ public class HistoryPartitionCluster {
 
     public HistoryPartitionCluster(
             FileStoreTable table,
+            Snapshot snapshot,
             IncrementalClusterStrategy incrementalClusterStrategy,
             InternalRowPartitionComputer partitionComputer,
             PartitionPredicate specifiedPartitions,
             Duration historyPartitionIdleTime,
             int historyPartitionLimit) {
         this.table = table;
+        this.snapshot = snapshot;
         this.incrementalClusterStrategy = incrementalClusterStrategy;
         this.partitionComputer = partitionComputer;
         this.specifiedPartitions = specifiedPartitions;
@@ -80,6 +84,7 @@ public class HistoryPartitionCluster {
     @Nullable
     public static HistoryPartitionCluster create(
             FileStoreTable table,
+            Snapshot snapshot,
             IncrementalClusterStrategy incrementalClusterStrategy,
             InternalRowPartitionComputer partitionComputer,
             @Nullable PartitionPredicate specifiedPartitions) {
@@ -98,6 +103,7 @@ public class HistoryPartitionCluster {
         int limit = table.coreOptions().clusteringHistoryPartitionLimit();
         return new HistoryPartitionCluster(
                 table,
+                snapshot,
                 incrementalClusterStrategy,
                 partitionComputer,
                 specifiedPartitions,
@@ -130,7 +136,8 @@ public class HistoryPartitionCluster {
                         .toEpochMilli();
 
         List<BinaryRow> historyPartitions =
-                table.newSnapshotReader().withLevelMinMaxFilter((min, max) -> 
min < maxLevel)
+                table.newSnapshotReader().withSnapshot(snapshot)
+                        .withLevelMinMaxFilter((min, max) -> min < maxLevel)
                         .withLevelFilter(level -> level < 
maxLevel).partitionEntries().stream()
                         .filter(entry -> entry.lastFileCreationTime() < 
historyMilli)
                         
.sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime))
@@ -140,6 +147,7 @@ public class HistoryPartitionCluster {
         // read dataFileMeta for history partitions
         List<DataSplit> historyDataSplits =
                 table.newSnapshotReader()
+                        .withSnapshot(snapshot)
                         .withPartitionFilter(historyPartitions)
                         .read()
                         .dataSplits();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 6865b4b82e..f8ab6f364b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -19,16 +19,27 @@
 package org.apache.paimon.append.cluster;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
+import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -39,11 +50,13 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
@@ -55,6 +68,7 @@ public class IncrementalClusterManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterManager.class);
 
     private final InternalRowPartitionComputer partitionComputer;
+    private final Snapshot snapshot;
     private final SnapshotReader snapshotReader;
     private final IncrementalClusterStrategy incrementalClusterStrategy;
     private final CoreOptions.OrderType clusterCurve;
@@ -83,8 +97,12 @@ public class IncrementalClusterManager {
                         table.store().partitionType(),
                         table.partitionKeys().toArray(new String[0]),
                         table.coreOptions().legacyPartitionName());
+        this.snapshot = table.snapshotManager().latestSnapshot();
         this.snapshotReader =
-                
table.newSnapshotReader().dropStats().withPartitionFilter(specifiedPartitions);
+                table.newSnapshotReader()
+                        .dropStats()
+                        .withPartitionFilter(specifiedPartitions)
+                        .withSnapshot(snapshot);
         this.incrementalClusterStrategy =
                 new IncrementalClusterStrategy(
                         table.schemaManager(),
@@ -96,7 +114,11 @@ public class IncrementalClusterManager {
         this.clusterKeys = options.clusteringColumns();
         this.historyPartitionCluster =
                 HistoryPartitionCluster.create(
-                        table, incrementalClusterStrategy, partitionComputer, 
specifiedPartitions);
+                        table,
+                        snapshot,
+                        incrementalClusterStrategy,
+                        partitionComputer,
+                        specifiedPartitions);
     }
 
     public Map<BinaryRow, CompactUnit> prepareForCluster(boolean 
fullCompaction) {
@@ -182,7 +204,10 @@ public class IncrementalClusterManager {
         return partitionLevels;
     }
 
-    public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> 
files) {
+    public List<DataSplit> toSplits(
+            BinaryRow partition,
+            List<DataFileMeta> files,
+            @Nullable AppendDeleteFileMaintainer dvIndexFileMaintainer) {
         List<DataSplit> splits = new ArrayList<>();
 
         DataSplit.Builder builder =
@@ -202,6 +227,14 @@ public class IncrementalClusterManager {
                     .rawConvertible(splitGroup.rawConvertible)
                     .withBucketPath(bucketPath);
 
+            if (dvIndexFileMaintainer != null) {
+                List<DeletionFile> dataDeletionFiles = new ArrayList<>();
+                for (DataFileMeta file : dataFiles) {
+                    
dataDeletionFiles.add(dvIndexFileMaintainer.getDeletionFile(file.fileName()));
+                }
+                builder.withDataDeletionFiles(dataDeletionFiles);
+            }
+
             splits.add(builder.build());
         }
 
@@ -215,6 +248,60 @@ public class IncrementalClusterManager {
                 .collect(Collectors.toList());
     }
 
+    public static Map<BinaryRow, AppendDeleteFileMaintainer> 
createAppendDvMaintainers(
+            FileStoreTable table, Set<BinaryRow> partitions, Snapshot 
snapshot) {
+        Map<BinaryRow, AppendDeleteFileMaintainer> dvMaintainers = new 
HashMap<>();
+        for (BinaryRow partition : partitions) {
+            AppendDeleteFileMaintainer appendDvMaintainer =
+                    BaseAppendDeleteFileMaintainer.forUnawareAppend(
+                            table.store().newIndexFileHandler(), snapshot, 
partition);
+            dvMaintainers.put(partition, appendDvMaintainer);
+        }
+        return dvMaintainers;
+    }
+
+    public static List<CommitMessage> producePartitionDvIndexCommitMessages(
+            FileStoreTable table,
+            List<DataSplit> splits,
+            AppendDeleteFileMaintainer appendDvMaintainer) {
+        checkArgument(appendDvMaintainer != null);
+        // remove deletion vector for files to be clustered
+        for (DataSplit dataSplit : splits) {
+            checkArgument(
+                    
dataSplit.partition().equals(appendDvMaintainer.getPartition()),
+                    "partition of this dataSplit is not matched with the Dv 
Maintainer!");
+            dataSplit
+                    .dataFiles()
+                    .forEach(f -> 
appendDvMaintainer.notifyRemovedDeletionVector(f.fileName()));
+        }
+
+        // generate new dv index meta, handle by partition
+        List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+        List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+        List<IndexManifestEntry> indexEntries = appendDvMaintainer.persist();
+        for (IndexManifestEntry entry : indexEntries) {
+            if (entry.kind() == FileKind.ADD) {
+                newIndexFiles.add(entry.indexFile());
+            } else {
+                deletedIndexFiles.add(entry.indexFile());
+            }
+        }
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        appendDvMaintainer.getPartition(),
+                        0,
+                        table.coreOptions().bucket(),
+                        DataIncrement.emptyIncrement(),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                newIndexFiles,
+                                deletedIndexFiles));
+
+        return Collections.singletonList(commitMessage);
+    }
+
     public static void logForPartitionLevel(
             Map<BinaryRow, List<LevelSortedRun>> partitionLevels,
             InternalRowPartitionComputer partitionComputer) {
@@ -247,6 +334,10 @@ public class IncrementalClusterManager {
         return clusterKeys;
     }
 
+    public Snapshot snapshot() {
+        return snapshot;
+    }
+
     @VisibleForTesting
     HistoryPartitionCluster historyPartitionCluster() {
         return historyPartitionCluster;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 572e8e17ee..09de891a37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -661,9 +661,6 @@ public class SchemaValidation {
                     schema.primaryKeys().isEmpty(),
                     "Cannot define %s for incremental clustering table.",
                     PRIMARY_KEY.key());
-            checkArgument(
-                    !options.deletionVectorsEnabled(),
-                    "Cannot enable deletion vectors mode for incremental 
clustering table.");
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 4b62caab88..a4d1071092 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -134,12 +134,7 @@ public class IncrementalClusterManagerTest {
     }
 
     @Test
-    public void testUpgrade() throws Exception {
-        // Create a valid table for IncrementalClusterManager
-        Map<String, String> options = new HashMap<>();
-        FileStoreTable table = createTable(options, Collections.emptyList());
-        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
-
+    public void testUpgrade() {
         // Create test files with different levels
         List<DataFileMeta> filesAfterCluster = new ArrayList<>();
         DataFileMeta file1 = createFile(100, 1, 0);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 6fdd790651..4a914e8ac9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -23,6 +23,7 @@ import 
org.apache.paimon.append.cluster.IncrementalClusterManager;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
 import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
@@ -50,6 +51,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -69,6 +71,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -250,6 +253,12 @@ public class CompactAction extends TableActionBase {
                             + "Please set '--force_start_flink_job true' if 
you need forcibly start a flink job.");
             return false;
         }
+
+        Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers =
+                table.coreOptions().deletionVectorsEnabled()
+                        ? IncrementalClusterManager.createAppendDvMaintainers(
+                                table, compactUnits.keySet(), 
incrementalClusterManager.snapshot())
+                        : Collections.emptyMap();
         Map<BinaryRow, DataSplit[]> partitionSplits =
                 compactUnits.entrySet().stream()
                         .collect(
@@ -259,7 +268,9 @@ public class CompactAction extends TableActionBase {
                                                 incrementalClusterManager
                                                         .toSplits(
                                                                 entry.getKey(),
-                                                                
entry.getValue().files())
+                                                                
entry.getValue().files(),
+                                                                
appendDvMaintainers.get(
+                                                                        
entry.getKey()))
                                                         .toArray(new 
DataSplit[0])));
 
         // 2. read,sort and write in partition
@@ -269,13 +280,22 @@ public class CompactAction extends TableActionBase {
             DataSplit[] splits = entry.getValue();
             LinkedHashMap<String, String> partitionSpec =
                     partitionComputer.generatePartValues(entry.getKey());
+
             // 2.1 generate source for current partition
+            List<CommitMessage> partitionDvIndexCommitMessages =
+                    appendDvMaintainers.get(entry.getKey()) == null
+                            ? Collections.emptyList()
+                            : 
IncrementalClusterManager.producePartitionDvIndexCommitMessages(
+                                    table,
+                                    Arrays.asList(splits),
+                                    appendDvMaintainers.get(entry.getKey()));
             Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
                     IncrementalClusterSplitSource.buildSource(
                             env,
                             table,
                             partitionSpec,
                             splits,
+                            partitionDvIndexCommitMessages,
                             
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
             // 2.2 cluster in partition
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
index 6d18181242..957c70fe60 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.source.SimpleSourceSplit;
 import org.apache.paimon.flink.source.operator.ReadOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Pair;
@@ -46,6 +47,7 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 
 /** Source for Incremental Clustering. */
@@ -86,6 +88,7 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
             FileStoreTable table,
             Map<String, String> partitionSpec,
             DataSplit[] splits,
+            List<CommitMessage> partitionDvIndexCommitMessages,
             @Nullable Integer parallelism) {
         DataStream<Split> source =
                 env.fromSource(
@@ -117,7 +120,8 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
                         .transform(
                                 "Remove files to be clustered",
                                 new CommittableTypeInfo(),
-                                new RemoveClusterBeforeFilesOperator())
+                                new RemoveClusterBeforeFilesOperator(
+                                        partitionDvIndexCommitMessages))
                         .forceNonParallel());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
index 907a69ba12..4c0d9f9941 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
@@ -29,12 +30,19 @@ import org.apache.paimon.table.source.Split;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collections;
+import java.util.List;
 
 /** Operator used with {@link IncrementalClusterSplitSource}, to remove files 
to be clustered. */
 public class RemoveClusterBeforeFilesOperator extends 
BoundedOneInputOperator<Split, Committable> {
 
     private static final long serialVersionUID = 1L;
 
+    private final List<CommitMessage> partitionDvIndexCommitMessages;
+
+    public RemoveClusterBeforeFilesOperator(List<CommitMessage> 
partitionDvIndexCommitMessages) {
+        this.partitionDvIndexCommitMessages = partitionDvIndexCommitMessages;
+    }
+
     @Override
     public void processElement(StreamRecord<Split> element) throws Exception {
         DataSplit dataSplit = (DataSplit) element.getValue();
@@ -54,5 +62,15 @@ public class RemoveClusterBeforeFilesOperator extends 
BoundedOneInputOperator<Sp
     }
 
     @Override
-    public void endInput() throws Exception {}
+    public void endInput() throws Exception {
+        emitDvIndexCommitMessages(Long.MAX_VALUE);
+    }
+
+    private void emitDvIndexCommitMessages(long checkpointId) {
+        for (CommitMessage commitMessage : partitionDvIndexCommitMessages) {
+            output.collect(
+                    new StreamRecord<>(
+                            new Committable(checkpointId, 
Committable.Kind.FILE, commitMessage)));
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 48e0f1fa30..bf7087e77e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -21,15 +21,25 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.deletionvectors.BitmapDeletionVector;
+import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
+import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -565,6 +575,112 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
     }
 
+    @Test
+    public void testClusterWithDeletionVector() throws Exception {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        FileStoreTable table = createTable(null, 1, dynamicOptions);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+        // first write
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+
+        // first cluster
+        runAction(Collections.emptyList());
+
+        // second write
+        messages.clear();
+        messages.addAll(
+                write(
+                        GenericRow.of(0, 3, null, 0),
+                        GenericRow.of(1, 3, null, 0),
+                        GenericRow.of(2, 3, null, 0)));
+        messages.addAll(
+                write(
+                        GenericRow.of(3, 0, null, 0),
+                        GenericRow.of(3, 1, null, 0),
+                        GenericRow.of(3, 2, null, 0),
+                        GenericRow.of(3, 3, null, 0)));
+        commit(messages);
+
+        // write deletion vector for the table
+        AppendDeleteFileMaintainer maintainer =
+                BaseAppendDeleteFileMaintainer.forUnawareAppend(
+                        table.store().newIndexFileHandler(),
+                        table.latestSnapshot().get(),
+                        BinaryRow.EMPTY_ROW);
+        List<DataFileMeta> files =
+                readBuilder.newScan().plan().splits().stream()
+                        .map(s -> ((DataSplit) s).dataFiles())
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        // delete (0,0) and (0,3)
+        for (DataFileMeta file : files) {
+            if (file.rowCount() == 9 || file.rowCount() == 3) {
+                BitmapDeletionVector dv = new BitmapDeletionVector();
+                dv.delete(0);
+                maintainer.notifyNewDeletionVector(file.fileName(), dv);
+            }
+        }
+        commit(produceDvIndexMessages(table, maintainer));
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected1 =
+                Lists.newArrayList(
+                        "+I[0, 1]",
+                        "+I[1, 0]",
+                        "+I[1, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 2]",
+                        "+I[2, 0]",
+                        "+I[2, 1]",
+                        "+I[2, 2]",
+                        "+I[1, 3]",
+                        "+I[2, 3]",
+                        "+I[3, 0]",
+                        "+I[3, 1]",
+                        "+I[3, 2]",
+                        "+I[3, 3]");
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        // second cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        List<String> result2 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        assertThat(result2.size()).isEqualTo(expected1.size());
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        // dv index for level-5 file should be retained
+        assertThat(splits.get(0).deletionFiles().get().get(0)).isNotNull();
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+        assertThat((splits.get(0).deletionFiles().get().get(1))).isNull();
+
+        // full cluster
+        runAction(Lists.newArrayList("--compact_strategy", "full"));
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
+    }
+
     protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
             throws Exception {
         return createTable(partitionKeys, sinkParallelism, 
Collections.emptyMap());
@@ -646,6 +762,32 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
                 .isEqualTo(Snapshot.CommitKind.COMPACT);
     }
 
+    private List<CommitMessage> produceDvIndexMessages(
+            FileStoreTable table, AppendDeleteFileMaintainer maintainer) {
+        List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+        List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+        List<IndexManifestEntry> indexEntries = maintainer.persist();
+        for (IndexManifestEntry entry : indexEntries) {
+            if (entry.kind() == FileKind.ADD) {
+                newIndexFiles.add(entry.indexFile());
+            } else {
+                deletedIndexFiles.add(entry.indexFile());
+            }
+        }
+        return Collections.singletonList(
+                new CommitMessageImpl(
+                        maintainer.getPartition(),
+                        0,
+                        table.coreOptions().bucket(),
+                        DataIncrement.emptyIncrement(),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                newIndexFiles,
+                                deletedIndexFiles)));
+    }
+
     private void runAction(List<String> extra) throws Exception {
         StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
         ArrayList<String> baseArgs =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 7cd20be759..45da3d2ce0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -26,7 +26,9 @@ import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.append.cluster.IncrementalClusterManager;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
@@ -556,6 +558,12 @@ public class CompactProcedure extends BaseProcedure {
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.prepareForCluster(fullCompaction);
 
+        Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers =
+                table.coreOptions().deletionVectorsEnabled()
+                        ? IncrementalClusterManager.createAppendDvMaintainers(
+                                table, compactUnits.keySet(), 
incrementalClusterManager.snapshot())
+                        : Collections.emptyMap();
+
         // generate splits for each partition
         Map<BinaryRow, DataSplit[]> partitionSplits =
                 compactUnits.entrySet().stream()
@@ -566,7 +574,9 @@ public class CompactProcedure extends BaseProcedure {
                                                 incrementalClusterManager
                                                         .toSplits(
                                                                 entry.getKey(),
-                                                                
entry.getValue().files())
+                                                                
entry.getValue().files(),
+                                                                
appendDvMaintainers.get(
+                                                                        
entry.getKey()))
                                                         .toArray(new 
DataSplit[0])));
 
         // sort in partition
@@ -623,8 +633,33 @@ public class CompactProcedure extends BaseProcedure {
                         "Partition {}: upgrade file level to {}",
                         partition,
                         compactUnits.get(partition).outputLevel());
+                // get the dv index messages
+                List<CommitMessage> partitionDvIndexCommitMessages =
+                        appendDvMaintainers.get(entry.getKey()) == null
+                                ? Collections.emptyList()
+                                : 
IncrementalClusterManager.producePartitionDvIndexCommitMessages(
+                                        table,
+                                        
Arrays.asList(partitionSplits.get(partition)),
+                                        
appendDvMaintainers.get(entry.getKey()));
+                List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+                List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+                for (CommitMessage dvCommitMessage : 
partitionDvIndexCommitMessages) {
+                    newIndexFiles.addAll(
+                            ((CommitMessageImpl) dvCommitMessage)
+                                    .compactIncrement()
+                                    .newIndexFiles());
+                    deletedIndexFiles.addAll(
+                            ((CommitMessageImpl) dvCommitMessage)
+                                    .compactIncrement()
+                                    .deletedIndexFiles());
+                }
                 CompactIncrement compactIncrement =
-                        new CompactIncrement(clusterBefore, clusterAfter, 
Collections.emptyList());
+                        new CompactIncrement(
+                                clusterBefore,
+                                clusterAfter,
+                                Collections.emptyList(),
+                                newIndexFiles,
+                                deletedIndexFiles);
                 clusterMessages.add(
                         new CommitMessageImpl(
                                 partition,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 5f653695d0..74f80befed 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming.StreamTest
 import org.assertj.core.api.Assertions
+import org.scalatest.time.Span
 
 import java.util
 
@@ -1137,6 +1138,141 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: cluster with deletion vectors") {
+    failAfter(Span(5, org.scalatest.time.Minutes)) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(
+            s"""
+               |CREATE TABLE T (a INT, b INT, c STRING)
+               |TBLPROPERTIES ('bucket'='-1', 
'deletion-vectors.enabled'='true','num-levels'='6', 
'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 
'clustering.strategy'='zorder', 'clustering.incremental' = 'true')
+               |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b", "c")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            val random = new Random()
+            val randomStr = random.nextString(40)
+            // first write
+            inputData.addData((0, 0, randomStr))
+            inputData.addData((0, 1, randomStr))
+            inputData.addData((0, 2, randomStr))
+            inputData.addData((1, 0, randomStr))
+            inputData.addData((1, 1, randomStr))
+            inputData.addData((1, 2, randomStr))
+            inputData.addData((2, 0, randomStr))
+            inputData.addData((2, 1, randomStr))
+            inputData.addData((2, 2, randomStr))
+            stream.processAllAvailable()
+
+            val result = new util.ArrayList[Row]()
+            for (a <- 0 until 3) {
+              for (b <- 0 until 3) {
+                result.add(Row(a, b, randomStr))
+              }
+            }
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result)
+
+            // first cluster, the outputLevel should be 5
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+
+            // first cluster result
+            val result2 = new util.ArrayList[Row]()
+            result2.add(0, Row(0, 0, randomStr))
+            result2.add(1, Row(0, 1, randomStr))
+            result2.add(2, Row(1, 0, randomStr))
+            result2.add(3, Row(1, 1, randomStr))
+            result2.add(4, Row(0, 2, randomStr))
+            result2.add(5, Row(1, 2, randomStr))
+            result2.add(6, Row(2, 0, randomStr))
+            result2.add(7, Row(2, 1, randomStr))
+            result2.add(8, Row(2, 2, randomStr))
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
+
+            var clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            var dataSplits = 
clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
+
+            // second write
+            inputData.addData((0, 3, null), (1, 3, null), (2, 3, null))
+            inputData.addData((3, 0, null), (3, 1, null), (3, 2, null), (3, 3, 
null))
+            stream.processAllAvailable()
+
+            // delete (0,0), which is in level-5 file
+            spark.sql("DELETE FROM T WHERE a=0 and b=0;").collect()
+            // delete (0,3), which is in level-0 file
+            spark.sql("DELETE FROM T WHERE a=0 and b=3;").collect()
+
+            val result3 = new util.ArrayList[Row]()
+            result3.addAll(result2.subList(1, result2.size()))
+            for (a <- 1 until 3) {
+              result3.add(Row(a, 3, null))
+            }
+            for (b <- 0 until 4) {
+              result3.add(Row(3, b, null))
+            }
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
+
+            // second cluster, the outputLevel should be 4. dv index for 
level-0 will be updated
+            // and dv index for level-5 will be retained
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+            // second cluster result, level-5 and level-4 are individually 
ordered
+            val result4 = new util.ArrayList[Row]()
+            result4.addAll(result2.subList(1, result2.size()))
+            result4.add(Row(1, 3, null))
+            result4.add(Row(3, 0, null))
+            result4.add(Row(3, 1, null))
+            result4.add(Row(2, 3, null))
+            result4.add(Row(3, 2, null))
+            result4.add(Row(3, 3, null))
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4)
+
+            clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(2)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
+            
Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(0)).isNotNull
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(1).level()).isEqualTo(4)
+            
Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(1)).isNull()
+
+            // full cluster
+            checkAnswer(
+              spark.sql("CALL paimon.sys.compact(table => 'T', 
compact_strategy => 'full')"),
+              Row(true) :: Nil)
+            clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(0)).isNull()
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
   def checkSnapshot(table: FileStoreTable): Unit = {
     Assertions
       .assertThat(table.latestSnapshot().get().commitKind().toString)


Reply via email to