This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f76bace9bc310d9b310669cb91b1a4c645719c11 Author: JingsongLi <[email protected]> AuthorDate: Mon Nov 10 18:23:59 2025 +0800 [core] Refactor deletion vectors support for incremental cluster --- .../append/cluster/IncrementalClusterManager.java | 159 ++++++++++----------- .../apache/paimon/flink/action/CompactAction.java | 40 ++---- .../cluster/IncrementalClusterSplitSource.java | 18 +-- .../cluster/RemoveClusterBeforeFilesOperator.java | 15 +- .../paimon/spark/procedure/CompactProcedure.java | 54 ++----- 5 files changed, 114 insertions(+), 172 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index f8ab6f364b..20dc2a49d2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -43,6 +43,7 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL; @@ -67,6 +67,7 @@ public class IncrementalClusterManager { private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class); + private final FileStoreTable table; private final InternalRowPartitionComputer partitionComputer; private final Snapshot snapshot; private final SnapshotReader snapshotReader; @@ -85,6 +86,7 @@ public class IncrementalClusterManager { checkArgument( table.bucketMode() == BucketMode.BUCKET_UNAWARE, "only append unaware-bucket table support incremental clustering."); + this.table = table; CoreOptions options = table.coreOptions(); checkArgument( options.clusteringIncrementalEnabled(), @@ -204,41 +206,82 @@ public class IncrementalClusterManager { return partitionLevels; } - public List<DataSplit> toSplits( - BinaryRow partition, - List<DataFileMeta> files, - @Nullable AppendDeleteFileMaintainer dvIndexFileMaintainer) { - List<DataSplit> splits = new ArrayList<>(); - - DataSplit.Builder builder = - DataSplit.builder() - .withPartition(partition) - .withBucket(0) - .withTotalBuckets(1) - .isStreaming(false); - - SplitGenerator splitGenerator = snapshotReader.splitGenerator(); - List<SplitGenerator.SplitGroup> splitGroups = splitGenerator.splitForBatch(files); - - for (SplitGenerator.SplitGroup splitGroup : splitGroups) { - List<DataFileMeta> dataFiles = splitGroup.files; - String bucketPath = snapshotReader.pathFactory().bucketPath(partition, 0).toString(); - builder.withDataFiles(dataFiles) - .rawConvertible(splitGroup.rawConvertible) - .withBucketPath(bucketPath); - - if (dvIndexFileMaintainer != null) { - List<DeletionFile> dataDeletionFiles = new ArrayList<>(); - for (DataFileMeta file : dataFiles) { - dataDeletionFiles.add(dvIndexFileMaintainer.getDeletionFile(file.fileName())); + public Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> toSplitsAndRewriteDvFiles( + Map<BinaryRow, CompactUnit> compactUnits) { + Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> result = new HashMap<>(); + boolean dvEnabled = table.coreOptions().deletionVectorsEnabled(); + for (BinaryRow partition : compactUnits.keySet()) { + CompactUnit unit = compactUnits.get(partition); + AppendDeleteFileMaintainer dvMaintainer = + dvEnabled + ? BaseAppendDeleteFileMaintainer.forUnawareAppend( + table.store().newIndexFileHandler(), snapshot, partition) + : null; + List<DataSplit> splits = new ArrayList<>(); + + DataSplit.Builder builder = + DataSplit.builder() + .withPartition(partition) + .withBucket(0) + .withTotalBuckets(1) + .isStreaming(false); + + SplitGenerator splitGenerator = snapshotReader.splitGenerator(); + List<SplitGenerator.SplitGroup> splitGroups = + splitGenerator.splitForBatch(unit.files()); + + for (SplitGenerator.SplitGroup splitGroup : splitGroups) { + List<DataFileMeta> dataFiles = splitGroup.files; + + String bucketPath = + snapshotReader.pathFactory().bucketPath(partition, 0).toString(); + builder.withDataFiles(dataFiles) + .rawConvertible(splitGroup.rawConvertible) + .withBucketPath(bucketPath); + + if (dvMaintainer != null) { + List<DeletionFile> dataDeletionFiles = new ArrayList<>(); + for (DataFileMeta file : dataFiles) { + DeletionFile deletionFile = + dvMaintainer.notifyRemovedDeletionVector(file.fileName()); + dataDeletionFiles.add(deletionFile); + } + builder.withDataDeletionFiles(dataDeletionFiles); } - builder.withDataDeletionFiles(dataDeletionFiles); + splits.add(builder.build()); } - splits.add(builder.build()); + // generate delete dv index meta + CommitMessage dvCommitMessage = null; + if (dvMaintainer != null) { + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); + List<IndexManifestEntry> indexEntries = dvMaintainer.persist(); + for (IndexManifestEntry entry : indexEntries) { + if (entry.kind() == FileKind.ADD) { + newIndexFiles.add(entry.indexFile()); + } else { + deletedIndexFiles.add(entry.indexFile()); + } + } + dvCommitMessage = + new CommitMessageImpl( + dvMaintainer.getPartition(), + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles)); + } + + result.put(partition, Pair.of(splits, dvCommitMessage)); } - return splits; + return result; } public static List<DataFileMeta> upgrade( @@ -248,60 +291,6 @@ public class IncrementalClusterManager { .collect(Collectors.toList()); } - public static Map<BinaryRow, AppendDeleteFileMaintainer> createAppendDvMaintainers( - FileStoreTable table, Set<BinaryRow> partitions, Snapshot snapshot) { - Map<BinaryRow, AppendDeleteFileMaintainer> dvMaintainers = new HashMap<>(); - for (BinaryRow partition : partitions) { - AppendDeleteFileMaintainer appendDvMaintainer = - BaseAppendDeleteFileMaintainer.forUnawareAppend( - table.store().newIndexFileHandler(), snapshot, partition); - dvMaintainers.put(partition, appendDvMaintainer); - } - return dvMaintainers; - } - - public static List<CommitMessage> producePartitionDvIndexCommitMessages( - FileStoreTable table, - List<DataSplit> splits, - AppendDeleteFileMaintainer appendDvMaintainer) { - checkArgument(appendDvMaintainer != null); - // remove deletion vector for files to be clustered - for (DataSplit dataSplit : splits) { - checkArgument( - dataSplit.partition().equals(appendDvMaintainer.getPartition()), - "partition of this dataSplit is not matched with the Dv Maintainer!"); - dataSplit - .dataFiles() - .forEach(f -> appendDvMaintainer.notifyRemovedDeletionVector(f.fileName())); - } - - // generate new dv index meta, handle by partition - List<IndexFileMeta> newIndexFiles = new ArrayList<>(); - List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); - List<IndexManifestEntry> indexEntries = appendDvMaintainer.persist(); - for (IndexManifestEntry entry : indexEntries) { - if (entry.kind() == FileKind.ADD) { - newIndexFiles.add(entry.indexFile()); - } else { - deletedIndexFiles.add(entry.indexFile()); - } - } - CommitMessageImpl commitMessage = - new CommitMessageImpl( - appendDvMaintainer.getPartition(), - 0, - table.coreOptions().bucket(), - DataIncrement.emptyIncrement(), - new CompactIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - newIndexFiles, - deletedIndexFiles)); - - return Collections.singletonList(commitMessage); - } - public static void logForPartitionLevel( Map<BinaryRow, List<LevelSortedRun>> partitionLevels, InternalRowPartitionComputer partitionComputer) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index fc8ef344ab..93f7724d64 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -23,7 +23,6 @@ import org.apache.paimon.append.cluster.IncrementalClusterManager; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; @@ -72,7 +71,6 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -262,48 +260,28 @@ public class CompactAction extends TableActionBase { } } - Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers = - table.coreOptions().deletionVectorsEnabled() - ? IncrementalClusterManager.createAppendDvMaintainers( - table, compactUnits.keySet(), incrementalClusterManager.snapshot()) - : Collections.emptyMap(); - Map<BinaryRow, DataSplit[]> partitionSplits = - compactUnits.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - incrementalClusterManager - .toSplits( - entry.getKey(), - entry.getValue().files(), - appendDvMaintainers.get( - entry.getKey())) - .toArray(new DataSplit[0]))); + Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits = + incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits); // 2. read,sort and write in partition List<DataStream<Committable>> dataStreams = new ArrayList<>(); - for (Map.Entry<BinaryRow, DataSplit[]> entry : partitionSplits.entrySet()) { - DataSplit[] splits = entry.getValue(); + for (Map.Entry<BinaryRow, Pair<List<DataSplit>, CommitMessage>> entry : + partitionSplits.entrySet()) { + BinaryRow partition = entry.getKey(); + List<DataSplit> splits = entry.getValue().getKey(); + CommitMessage dvCommitMessage = entry.getValue().getRight(); LinkedHashMap<String, String> partitionSpec = - partitionComputer.generatePartValues(entry.getKey()); + partitionComputer.generatePartValues(partition); // 2.1 generate source for current partition - List<CommitMessage> partitionDvIndexCommitMessages = - appendDvMaintainers.get(entry.getKey()) == null - ? Collections.emptyList() - : IncrementalClusterManager.producePartitionDvIndexCommitMessages( - table, - Arrays.asList(splits), - appendDvMaintainers.get(entry.getKey())); Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = IncrementalClusterSplitSource.buildSource( env, table, partitionSpec, splits, - partitionDvIndexCommitMessages, + dvCommitMessage, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); // 2.2 cluster in partition diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java index 957c70fe60..132dab660e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -52,11 +52,12 @@ import java.util.Map; /** Source for Incremental Clustering. */ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource<Split> { - private static final long serialVersionUID = 1L; - private final Split[] splits; + private static final long serialVersionUID = 2L; - public IncrementalClusterSplitSource(Split[] splits) { + private final List<Split> splits; + + public IncrementalClusterSplitSource(List<Split> splits) { this.splits = splits; } @@ -74,7 +75,7 @@ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource< private class Reader extends AbstractNonCoordinatedSourceReader<Split> { @Override - public InputStatus pollNext(ReaderOutput<Split> output) throws Exception { + public InputStatus pollNext(ReaderOutput<Split> output) { for (Split split : splits) { DataSplit dataSplit = (DataSplit) split; output.collect(dataSplit); @@ -87,12 +88,12 @@ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource< StreamExecutionEnvironment env, FileStoreTable table, Map<String, String> partitionSpec, - DataSplit[] splits, - List<CommitMessage> partitionDvIndexCommitMessages, + List<DataSplit> splits, + @Nullable CommitMessage dvCommitMessage, @Nullable Integer parallelism) { DataStream<Split> source = env.fromSource( - new IncrementalClusterSplitSource(splits), + new IncrementalClusterSplitSource((List) splits), WatermarkStrategy.noWatermarks(), String.format( "Incremental-cluster split generator: %s - %s", @@ -120,8 +121,7 @@ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource< .transform( "Remove files to be clustered", new CommittableTypeInfo(), - new RemoveClusterBeforeFilesOperator( - partitionDvIndexCommitMessages)) + new RemoveClusterBeforeFilesOperator(dvCommitMessage)) .forceNonParallel()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java index 4c0d9f9941..83b41940bc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -29,18 +29,19 @@ import org.apache.paimon.table.source.Split; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import javax.annotation.Nullable; + import java.util.Collections; -import java.util.List; /** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator<Split, Committable> { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; - private final List<CommitMessage> partitionDvIndexCommitMessages; + private final @Nullable CommitMessage dvCommitMessage; - public RemoveClusterBeforeFilesOperator(List<CommitMessage> partitionDvIndexCommitMessages) { - this.partitionDvIndexCommitMessages = partitionDvIndexCommitMessages; + public RemoveClusterBeforeFilesOperator(@Nullable CommitMessage dvCommitMessage) { + this.dvCommitMessage = dvCommitMessage; } @Override @@ -67,10 +68,10 @@ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator<Sp } private void emitDvIndexCommitMessages(long checkpointId) { - for (CommitMessage commitMessage : partitionDvIndexCommitMessages) { + if (dvCommitMessage != null) { output.collect( new StreamRecord<>( - new Committable(checkpointId, Committable.Kind.FILE, commitMessage))); + new Committable(checkpointId, Committable.Kind.FILE, dvCommitMessage))); } } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index b23d9c243c..56dcebe954 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -26,7 +26,6 @@ import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.append.cluster.IncrementalClusterManager; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; @@ -539,26 +538,8 @@ public class CompactProcedure extends BaseProcedure { Map<BinaryRow, CompactUnit> compactUnits = incrementalClusterManager.prepareForCluster(fullCompaction); - Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers = - table.coreOptions().deletionVectorsEnabled() - ? IncrementalClusterManager.createAppendDvMaintainers( - table, compactUnits.keySet(), incrementalClusterManager.snapshot()) - : Collections.emptyMap(); - - // generate splits for each partition - Map<BinaryRow, DataSplit[]> partitionSplits = - compactUnits.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - incrementalClusterManager - .toSplits( - entry.getKey(), - entry.getValue().files(), - appendDvMaintainers.get( - entry.getKey())) - .toArray(new DataSplit[0]))); + Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits = + incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits); // sort in partition TableSorter sorter = @@ -573,13 +554,15 @@ public class CompactProcedure extends BaseProcedure { Dataset<Row> datasetForWrite = partitionSplits.values().stream() + .map(Pair::getKey) .map( - split -> { + splits -> { Dataset<Row> dataset = PaimonUtils.createDataset( spark(), ScanPlanHelper$.MODULE$.createNewScanPlan( - split, relation)); + splits.toArray(new DataSplit[0]), + relation)); return sorter.sort(dataset); }) .reduce(Dataset::union) @@ -605,6 +588,8 @@ public class CompactProcedure extends BaseProcedure { List<CommitMessage> clusterMessages = new ArrayList<>(); for (Map.Entry<BinaryRow, List<DataFileMeta>> entry : partitionClustered.entrySet()) { BinaryRow partition = entry.getKey(); + CommitMessageImpl dvCommitMessage = + (CommitMessageImpl) partitionSplits.get(partition).getValue(); List<DataFileMeta> clusterBefore = compactUnits.get(partition).files(); // upgrade the clustered file to outputLevel List<DataFileMeta> clusterAfter = @@ -614,26 +599,15 @@ public class CompactProcedure extends BaseProcedure { "Partition {}: upgrade file level to {}", partition, compactUnits.get(partition).outputLevel()); - // get the dv index messages - List<CommitMessage> partitionDvIndexCommitMessages = - appendDvMaintainers.get(entry.getKey()) == null - ? Collections.emptyList() - : IncrementalClusterManager.producePartitionDvIndexCommitMessages( - table, - Arrays.asList(partitionSplits.get(partition)), - appendDvMaintainers.get(entry.getKey())); + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); - for (CommitMessage dvCommitMessage : partitionDvIndexCommitMessages) { - newIndexFiles.addAll( - ((CommitMessageImpl) dvCommitMessage) - .compactIncrement() - .newIndexFiles()); - deletedIndexFiles.addAll( - ((CommitMessageImpl) dvCommitMessage) - .compactIncrement() - .deletedIndexFiles()); + if (dvCommitMessage != null) { + newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles(); + deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles(); } + + // get the dv index messages CompactIncrement compactIncrement = new CompactIncrement( clusterBefore,
