This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit f76bace9bc310d9b310669cb91b1a4c645719c11
Author: JingsongLi <[email protected]>
AuthorDate: Mon Nov 10 18:23:59 2025 +0800

    [core] Refactor deletion vectors support for incremental cluster
---
 .../append/cluster/IncrementalClusterManager.java  | 159 ++++++++++-----------
 .../apache/paimon/flink/action/CompactAction.java  |  40 ++----
 .../cluster/IncrementalClusterSplitSource.java     |  18 +--
 .../cluster/RemoveClusterBeforeFilesOperator.java  |  15 +-
 .../paimon/spark/procedure/CompactProcedure.java   |  54 ++-----
 5 files changed, 114 insertions(+), 172 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index f8ab6f364b..20dc2a49d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -43,6 +43,7 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +57,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
@@ -67,6 +67,7 @@ public class IncrementalClusterManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterManager.class);
 
+    private final FileStoreTable table;
     private final InternalRowPartitionComputer partitionComputer;
     private final Snapshot snapshot;
     private final SnapshotReader snapshotReader;
@@ -85,6 +86,7 @@ public class IncrementalClusterManager {
         checkArgument(
                 table.bucketMode() == BucketMode.BUCKET_UNAWARE,
                 "only append unaware-bucket table support incremental 
clustering.");
+        this.table = table;
         CoreOptions options = table.coreOptions();
         checkArgument(
                 options.clusteringIncrementalEnabled(),
@@ -204,41 +206,82 @@ public class IncrementalClusterManager {
         return partitionLevels;
     }
 
-    public List<DataSplit> toSplits(
-            BinaryRow partition,
-            List<DataFileMeta> files,
-            @Nullable AppendDeleteFileMaintainer dvIndexFileMaintainer) {
-        List<DataSplit> splits = new ArrayList<>();
-
-        DataSplit.Builder builder =
-                DataSplit.builder()
-                        .withPartition(partition)
-                        .withBucket(0)
-                        .withTotalBuckets(1)
-                        .isStreaming(false);
-
-        SplitGenerator splitGenerator = snapshotReader.splitGenerator();
-        List<SplitGenerator.SplitGroup> splitGroups = 
splitGenerator.splitForBatch(files);
-
-        for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
-            List<DataFileMeta> dataFiles = splitGroup.files;
-            String bucketPath = 
snapshotReader.pathFactory().bucketPath(partition, 0).toString();
-            builder.withDataFiles(dataFiles)
-                    .rawConvertible(splitGroup.rawConvertible)
-                    .withBucketPath(bucketPath);
-
-            if (dvIndexFileMaintainer != null) {
-                List<DeletionFile> dataDeletionFiles = new ArrayList<>();
-                for (DataFileMeta file : dataFiles) {
-                    
dataDeletionFiles.add(dvIndexFileMaintainer.getDeletionFile(file.fileName()));
+    public Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> 
toSplitsAndRewriteDvFiles(
+            Map<BinaryRow, CompactUnit> compactUnits) {
+        Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> result = new 
HashMap<>();
+        boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
+        for (BinaryRow partition : compactUnits.keySet()) {
+            CompactUnit unit = compactUnits.get(partition);
+            AppendDeleteFileMaintainer dvMaintainer =
+                    dvEnabled
+                            ? BaseAppendDeleteFileMaintainer.forUnawareAppend(
+                                    table.store().newIndexFileHandler(), 
snapshot, partition)
+                            : null;
+            List<DataSplit> splits = new ArrayList<>();
+
+            DataSplit.Builder builder =
+                    DataSplit.builder()
+                            .withPartition(partition)
+                            .withBucket(0)
+                            .withTotalBuckets(1)
+                            .isStreaming(false);
+
+            SplitGenerator splitGenerator = snapshotReader.splitGenerator();
+            List<SplitGenerator.SplitGroup> splitGroups =
+                    splitGenerator.splitForBatch(unit.files());
+
+            for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
+                List<DataFileMeta> dataFiles = splitGroup.files;
+
+                String bucketPath =
+                        snapshotReader.pathFactory().bucketPath(partition, 
0).toString();
+                builder.withDataFiles(dataFiles)
+                        .rawConvertible(splitGroup.rawConvertible)
+                        .withBucketPath(bucketPath);
+
+                if (dvMaintainer != null) {
+                    List<DeletionFile> dataDeletionFiles = new ArrayList<>();
+                    for (DataFileMeta file : dataFiles) {
+                        DeletionFile deletionFile =
+                                
dvMaintainer.notifyRemovedDeletionVector(file.fileName());
+                        dataDeletionFiles.add(deletionFile);
+                    }
+                    builder.withDataDeletionFiles(dataDeletionFiles);
                 }
-                builder.withDataDeletionFiles(dataDeletionFiles);
+                splits.add(builder.build());
             }
 
-            splits.add(builder.build());
+            // generate delete dv index meta
+            CommitMessage dvCommitMessage = null;
+            if (dvMaintainer != null) {
+                List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+                List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+                List<IndexManifestEntry> indexEntries = dvMaintainer.persist();
+                for (IndexManifestEntry entry : indexEntries) {
+                    if (entry.kind() == FileKind.ADD) {
+                        newIndexFiles.add(entry.indexFile());
+                    } else {
+                        deletedIndexFiles.add(entry.indexFile());
+                    }
+                }
+                dvCommitMessage =
+                        new CommitMessageImpl(
+                                dvMaintainer.getPartition(),
+                                0,
+                                table.coreOptions().bucket(),
+                                DataIncrement.emptyIncrement(),
+                                new CompactIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        newIndexFiles,
+                                        deletedIndexFiles));
+            }
+
+            result.put(partition, Pair.of(splits, dvCommitMessage));
         }
 
-        return splits;
+        return result;
     }
 
     public static List<DataFileMeta> upgrade(
@@ -248,60 +291,6 @@ public class IncrementalClusterManager {
                 .collect(Collectors.toList());
     }
 
-    public static Map<BinaryRow, AppendDeleteFileMaintainer> 
createAppendDvMaintainers(
-            FileStoreTable table, Set<BinaryRow> partitions, Snapshot 
snapshot) {
-        Map<BinaryRow, AppendDeleteFileMaintainer> dvMaintainers = new 
HashMap<>();
-        for (BinaryRow partition : partitions) {
-            AppendDeleteFileMaintainer appendDvMaintainer =
-                    BaseAppendDeleteFileMaintainer.forUnawareAppend(
-                            table.store().newIndexFileHandler(), snapshot, 
partition);
-            dvMaintainers.put(partition, appendDvMaintainer);
-        }
-        return dvMaintainers;
-    }
-
-    public static List<CommitMessage> producePartitionDvIndexCommitMessages(
-            FileStoreTable table,
-            List<DataSplit> splits,
-            AppendDeleteFileMaintainer appendDvMaintainer) {
-        checkArgument(appendDvMaintainer != null);
-        // remove deletion vector for files to be clustered
-        for (DataSplit dataSplit : splits) {
-            checkArgument(
-                    
dataSplit.partition().equals(appendDvMaintainer.getPartition()),
-                    "partition of this dataSplit is not matched with the Dv 
Maintainer!");
-            dataSplit
-                    .dataFiles()
-                    .forEach(f -> 
appendDvMaintainer.notifyRemovedDeletionVector(f.fileName()));
-        }
-
-        // generate new dv index meta, handle by partition
-        List<IndexFileMeta> newIndexFiles = new ArrayList<>();
-        List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
-        List<IndexManifestEntry> indexEntries = appendDvMaintainer.persist();
-        for (IndexManifestEntry entry : indexEntries) {
-            if (entry.kind() == FileKind.ADD) {
-                newIndexFiles.add(entry.indexFile());
-            } else {
-                deletedIndexFiles.add(entry.indexFile());
-            }
-        }
-        CommitMessageImpl commitMessage =
-                new CommitMessageImpl(
-                        appendDvMaintainer.getPartition(),
-                        0,
-                        table.coreOptions().bucket(),
-                        DataIncrement.emptyIncrement(),
-                        new CompactIncrement(
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                newIndexFiles,
-                                deletedIndexFiles));
-
-        return Collections.singletonList(commitMessage);
-    }
-
     public static void logForPartitionLevel(
             Map<BinaryRow, List<LevelSortedRun>> partitionLevels,
             InternalRowPartitionComputer partitionComputer) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index fc8ef344ab..93f7724d64 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -23,7 +23,6 @@ import 
org.apache.paimon.append.cluster.IncrementalClusterManager;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
 import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
@@ -72,7 +71,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -262,48 +260,28 @@ public class CompactAction extends TableActionBase {
             }
         }
 
-        Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers =
-                table.coreOptions().deletionVectorsEnabled()
-                        ? IncrementalClusterManager.createAppendDvMaintainers(
-                                table, compactUnits.keySet(), 
incrementalClusterManager.snapshot())
-                        : Collections.emptyMap();
-        Map<BinaryRow, DataSplit[]> partitionSplits =
-                compactUnits.entrySet().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        Map.Entry::getKey,
-                                        entry ->
-                                                incrementalClusterManager
-                                                        .toSplits(
-                                                                entry.getKey(),
-                                                                
entry.getValue().files(),
-                                                                
appendDvMaintainers.get(
-                                                                        
entry.getKey()))
-                                                        .toArray(new 
DataSplit[0])));
+        Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
+                
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
 
         // 2. read,sort and write in partition
         List<DataStream<Committable>> dataStreams = new ArrayList<>();
 
-        for (Map.Entry<BinaryRow, DataSplit[]> entry : 
partitionSplits.entrySet()) {
-            DataSplit[] splits = entry.getValue();
+        for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry :
+                partitionSplits.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            List<DataSplit> splits = entry.getValue().getKey();
+            CommitMessage dvCommitMessage = entry.getValue().getRight();
             LinkedHashMap<String, String> partitionSpec =
-                    partitionComputer.generatePartValues(entry.getKey());
+                    partitionComputer.generatePartValues(partition);
 
             // 2.1 generate source for current partition
-            List<CommitMessage> partitionDvIndexCommitMessages =
-                    appendDvMaintainers.get(entry.getKey()) == null
-                            ? Collections.emptyList()
-                            : 
IncrementalClusterManager.producePartitionDvIndexCommitMessages(
-                                    table,
-                                    Arrays.asList(splits),
-                                    appendDvMaintainers.get(entry.getKey()));
             Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
                     IncrementalClusterSplitSource.buildSource(
                             env,
                             table,
                             partitionSpec,
                             splits,
-                            partitionDvIndexCommitMessages,
+                            dvCommitMessage,
                             
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
             // 2.2 cluster in partition
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
index 957c70fe60..132dab660e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -52,11 +52,12 @@ import java.util.Map;
 
 /** Source for Incremental Clustering. */
 public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<Split> {
-    private static final long serialVersionUID = 1L;
 
-    private final Split[] splits;
+    private static final long serialVersionUID = 2L;
 
-    public IncrementalClusterSplitSource(Split[] splits) {
+    private final List<Split> splits;
+
+    public IncrementalClusterSplitSource(List<Split> splits) {
         this.splits = splits;
     }
 
@@ -74,7 +75,7 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
     private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
 
         @Override
-        public InputStatus pollNext(ReaderOutput<Split> output) throws 
Exception {
+        public InputStatus pollNext(ReaderOutput<Split> output) {
             for (Split split : splits) {
                 DataSplit dataSplit = (DataSplit) split;
                 output.collect(dataSplit);
@@ -87,12 +88,12 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
             StreamExecutionEnvironment env,
             FileStoreTable table,
             Map<String, String> partitionSpec,
-            DataSplit[] splits,
-            List<CommitMessage> partitionDvIndexCommitMessages,
+            List<DataSplit> splits,
+            @Nullable CommitMessage dvCommitMessage,
             @Nullable Integer parallelism) {
         DataStream<Split> source =
                 env.fromSource(
-                                new IncrementalClusterSplitSource(splits),
+                                new IncrementalClusterSplitSource((List) 
splits),
                                 WatermarkStrategy.noWatermarks(),
                                 String.format(
                                         "Incremental-cluster split generator: 
%s - %s",
@@ -120,8 +121,7 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
                         .transform(
                                 "Remove files to be clustered",
                                 new CommittableTypeInfo(),
-                                new RemoveClusterBeforeFilesOperator(
-                                        partitionDvIndexCommitMessages))
+                                new 
RemoveClusterBeforeFilesOperator(dvCommitMessage))
                         .forceNonParallel());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
index 4c0d9f9941..83b41940bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
@@ -29,18 +29,19 @@ import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
-import java.util.List;
 
 /** Operator used with {@link IncrementalClusterSplitSource}, to remove files 
to be clustered. */
 public class RemoveClusterBeforeFilesOperator extends 
BoundedOneInputOperator<Split, Committable> {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    private final List<CommitMessage> partitionDvIndexCommitMessages;
+    private final @Nullable CommitMessage dvCommitMessage;
 
-    public RemoveClusterBeforeFilesOperator(List<CommitMessage> 
partitionDvIndexCommitMessages) {
-        this.partitionDvIndexCommitMessages = partitionDvIndexCommitMessages;
+    public RemoveClusterBeforeFilesOperator(@Nullable CommitMessage 
dvCommitMessage) {
+        this.dvCommitMessage = dvCommitMessage;
     }
 
     @Override
@@ -67,10 +68,10 @@ public class RemoveClusterBeforeFilesOperator extends 
BoundedOneInputOperator<Sp
     }
 
     private void emitDvIndexCommitMessages(long checkpointId) {
-        for (CommitMessage commitMessage : partitionDvIndexCommitMessages) {
+        if (dvCommitMessage != null) {
             output.collect(
                     new StreamRecord<>(
-                            new Committable(checkpointId, 
Committable.Kind.FILE, commitMessage)));
+                            new Committable(checkpointId, 
Committable.Kind.FILE, dvCommitMessage)));
         }
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index b23d9c243c..56dcebe954 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -26,7 +26,6 @@ import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.append.cluster.IncrementalClusterManager;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
@@ -539,26 +538,8 @@ public class CompactProcedure extends BaseProcedure {
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.prepareForCluster(fullCompaction);
 
-        Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers =
-                table.coreOptions().deletionVectorsEnabled()
-                        ? IncrementalClusterManager.createAppendDvMaintainers(
-                                table, compactUnits.keySet(), 
incrementalClusterManager.snapshot())
-                        : Collections.emptyMap();
-
-        // generate splits for each partition
-        Map<BinaryRow, DataSplit[]> partitionSplits =
-                compactUnits.entrySet().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        Map.Entry::getKey,
-                                        entry ->
-                                                incrementalClusterManager
-                                                        .toSplits(
-                                                                entry.getKey(),
-                                                                
entry.getValue().files(),
-                                                                
appendDvMaintainers.get(
-                                                                        
entry.getKey()))
-                                                        .toArray(new 
DataSplit[0])));
+        Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
+                
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
 
         // sort in partition
         TableSorter sorter =
@@ -573,13 +554,15 @@ public class CompactProcedure extends BaseProcedure {
 
         Dataset<Row> datasetForWrite =
                 partitionSplits.values().stream()
+                        .map(Pair::getKey)
                         .map(
-                                split -> {
+                                splits -> {
                                     Dataset<Row> dataset =
                                             PaimonUtils.createDataset(
                                                     spark(),
                                                     
ScanPlanHelper$.MODULE$.createNewScanPlan(
-                                                            split, relation));
+                                                            splits.toArray(new 
DataSplit[0]),
+                                                            relation));
                                     return sorter.sort(dataset);
                                 })
                         .reduce(Dataset::union)
@@ -605,6 +588,8 @@ public class CompactProcedure extends BaseProcedure {
             List<CommitMessage> clusterMessages = new ArrayList<>();
             for (Map.Entry<BinaryRow, List<DataFileMeta>> entry : 
partitionClustered.entrySet()) {
                 BinaryRow partition = entry.getKey();
+                CommitMessageImpl dvCommitMessage =
+                        (CommitMessageImpl) 
partitionSplits.get(partition).getValue();
                 List<DataFileMeta> clusterBefore = 
compactUnits.get(partition).files();
                 // upgrade the clustered file to outputLevel
                 List<DataFileMeta> clusterAfter =
@@ -614,26 +599,15 @@ public class CompactProcedure extends BaseProcedure {
                         "Partition {}: upgrade file level to {}",
                         partition,
                         compactUnits.get(partition).outputLevel());
-                // get the dv index messages
-                List<CommitMessage> partitionDvIndexCommitMessages =
-                        appendDvMaintainers.get(entry.getKey()) == null
-                                ? Collections.emptyList()
-                                : 
IncrementalClusterManager.producePartitionDvIndexCommitMessages(
-                                        table,
-                                        
Arrays.asList(partitionSplits.get(partition)),
-                                        
appendDvMaintainers.get(entry.getKey()));
+
                 List<IndexFileMeta> newIndexFiles = new ArrayList<>();
                 List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
-                for (CommitMessage dvCommitMessage : 
partitionDvIndexCommitMessages) {
-                    newIndexFiles.addAll(
-                            ((CommitMessageImpl) dvCommitMessage)
-                                    .compactIncrement()
-                                    .newIndexFiles());
-                    deletedIndexFiles.addAll(
-                            ((CommitMessageImpl) dvCommitMessage)
-                                    .compactIncrement()
-                                    .deletedIndexFiles());
+                if (dvCommitMessage != null) {
+                    newIndexFiles = 
dvCommitMessage.compactIncrement().newIndexFiles();
+                    deletedIndexFiles = 
dvCommitMessage.compactIncrement().deletedIndexFiles();
                 }
+
+                // get the dv index messages
                 CompactIncrement compactIncrement =
                         new CompactIncrement(
                                 clusterBefore,

Reply via email to