This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a47a28ec38550f319aee467206f0c5e1b4cf028d Author: LsomeYeah <[email protected]> AuthorDate: Mon Nov 10 17:46:30 2025 +0800 [core] support incremental clustering in dv mode (#6559) --- .../append/cluster/HistoryPartitionCluster.java | 10 +- .../append/cluster/IncrementalClusterManager.java | 97 +++++++++++++- .../org/apache/paimon/schema/SchemaValidation.java | 3 - .../cluster/IncrementalClusterManagerTest.java | 7 +- .../apache/paimon/flink/action/CompactAction.java | 22 +++- .../cluster/IncrementalClusterSplitSource.java | 6 +- .../cluster/RemoveClusterBeforeFilesOperator.java | 20 ++- .../action/IncrementalClusterActionITCase.java | 142 +++++++++++++++++++++ .../paimon/spark/procedure/CompactProcedure.java | 39 +++++- .../spark/procedure/CompactProcedureTestBase.scala | 136 ++++++++++++++++++++ 10 files changed, 464 insertions(+), 18 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java index 83f92aa9c7..8acbb8e778 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java @@ -18,6 +18,7 @@ package org.apache.paimon.append.cluster; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; @@ -54,6 +55,7 @@ public class HistoryPartitionCluster { private static final Logger LOG = LoggerFactory.getLogger(HistoryPartitionCluster.class); private final FileStoreTable table; + private final Snapshot snapshot; private final IncrementalClusterStrategy incrementalClusterStrategy; private final InternalRowPartitionComputer partitionComputer; private final PartitionPredicate specifiedPartitions; @@ -63,12 +65,14 @@ public class HistoryPartitionCluster { public HistoryPartitionCluster( FileStoreTable table, + Snapshot snapshot, IncrementalClusterStrategy incrementalClusterStrategy, InternalRowPartitionComputer partitionComputer, PartitionPredicate specifiedPartitions, Duration historyPartitionIdleTime, int historyPartitionLimit) { this.table = table; + this.snapshot = snapshot; this.incrementalClusterStrategy = incrementalClusterStrategy; this.partitionComputer = partitionComputer; this.specifiedPartitions = specifiedPartitions; @@ -80,6 +84,7 @@ public class HistoryPartitionCluster { @Nullable public static HistoryPartitionCluster create( FileStoreTable table, + Snapshot snapshot, IncrementalClusterStrategy incrementalClusterStrategy, InternalRowPartitionComputer partitionComputer, @Nullable PartitionPredicate specifiedPartitions) { @@ -98,6 +103,7 @@ public class HistoryPartitionCluster { int limit = table.coreOptions().clusteringHistoryPartitionLimit(); return new HistoryPartitionCluster( table, + snapshot, incrementalClusterStrategy, partitionComputer, specifiedPartitions, @@ -130,7 +136,8 @@ public class HistoryPartitionCluster { .toEpochMilli(); List<BinaryRow> historyPartitions = - table.newSnapshotReader().withLevelMinMaxFilter((min, max) -> min < maxLevel) + table.newSnapshotReader().withSnapshot(snapshot) + .withLevelMinMaxFilter((min, max) -> min < maxLevel) .withLevelFilter(level -> level < maxLevel).partitionEntries().stream() .filter(entry -> entry.lastFileCreationTime() < historyMilli) .sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime)) @@ -140,6 +147,7 @@ public class HistoryPartitionCluster { // read dataFileMeta for history partitions List<DataSplit> historyDataSplits = table.newSnapshotReader() + .withSnapshot(snapshot) .withPartitionFilter(historyPartitions) .read() .dataSplits(); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 6865b4b82e..f8ab6f364b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -19,16 +19,27 @@ package org.apache.paimon.append.cluster; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; +import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -39,11 +50,13 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL; @@ -55,6 +68,7 @@ public class IncrementalClusterManager { private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class); private final InternalRowPartitionComputer partitionComputer; + private final Snapshot snapshot; private final SnapshotReader snapshotReader; private final IncrementalClusterStrategy incrementalClusterStrategy; private final CoreOptions.OrderType clusterCurve; @@ -83,8 +97,12 @@ public class IncrementalClusterManager { table.store().partitionType(), table.partitionKeys().toArray(new String[0]), table.coreOptions().legacyPartitionName()); + this.snapshot = table.snapshotManager().latestSnapshot(); this.snapshotReader = - table.newSnapshotReader().dropStats().withPartitionFilter(specifiedPartitions); + table.newSnapshotReader() + .dropStats() + .withPartitionFilter(specifiedPartitions) + .withSnapshot(snapshot); this.incrementalClusterStrategy = new IncrementalClusterStrategy( table.schemaManager(), @@ -96,7 +114,11 @@ public class IncrementalClusterManager { this.clusterKeys = options.clusteringColumns(); this.historyPartitionCluster = HistoryPartitionCluster.create( - table, incrementalClusterStrategy, partitionComputer, specifiedPartitions); + table, + snapshot, + incrementalClusterStrategy, + partitionComputer, + specifiedPartitions); } public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) { @@ -182,7 +204,10 @@ public class IncrementalClusterManager { return partitionLevels; } - public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) { + public List<DataSplit> toSplits( + BinaryRow partition, + List<DataFileMeta> files, + @Nullable AppendDeleteFileMaintainer dvIndexFileMaintainer) { List<DataSplit> splits = new ArrayList<>(); DataSplit.Builder builder = @@ -202,6 +227,14 @@ public class IncrementalClusterManager { .rawConvertible(splitGroup.rawConvertible) .withBucketPath(bucketPath); + if (dvIndexFileMaintainer != null) { + List<DeletionFile> dataDeletionFiles = new ArrayList<>(); + for (DataFileMeta file : dataFiles) { + dataDeletionFiles.add(dvIndexFileMaintainer.getDeletionFile(file.fileName())); + } + builder.withDataDeletionFiles(dataDeletionFiles); + } + splits.add(builder.build()); } @@ -215,6 +248,60 @@ public class IncrementalClusterManager { .collect(Collectors.toList()); } + public static Map<BinaryRow, AppendDeleteFileMaintainer> createAppendDvMaintainers( + FileStoreTable table, Set<BinaryRow> partitions, Snapshot snapshot) { + Map<BinaryRow, AppendDeleteFileMaintainer> dvMaintainers = new HashMap<>(); + for (BinaryRow partition : partitions) { + AppendDeleteFileMaintainer appendDvMaintainer = + BaseAppendDeleteFileMaintainer.forUnawareAppend( + table.store().newIndexFileHandler(), snapshot, partition); + dvMaintainers.put(partition, appendDvMaintainer); + } + return dvMaintainers; + } + + public static List<CommitMessage> producePartitionDvIndexCommitMessages( + FileStoreTable table, + List<DataSplit> splits, + AppendDeleteFileMaintainer appendDvMaintainer) { + checkArgument(appendDvMaintainer != null); + // remove deletion vector for files to be clustered + for (DataSplit dataSplit : splits) { + checkArgument( + dataSplit.partition().equals(appendDvMaintainer.getPartition()), + "partition of this dataSplit is not matched with the Dv Maintainer!"); + dataSplit + .dataFiles() + .forEach(f -> appendDvMaintainer.notifyRemovedDeletionVector(f.fileName())); + } + + // generate new dv index meta, handle by partition + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); + List<IndexManifestEntry> indexEntries = appendDvMaintainer.persist(); + for (IndexManifestEntry entry : indexEntries) { + if (entry.kind() == FileKind.ADD) { + newIndexFiles.add(entry.indexFile()); + } else { + deletedIndexFiles.add(entry.indexFile()); + } + } + CommitMessageImpl commitMessage = + new CommitMessageImpl( + appendDvMaintainer.getPartition(), + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles)); + + return Collections.singletonList(commitMessage); + } + public static void logForPartitionLevel( Map<BinaryRow, List<LevelSortedRun>> partitionLevels, InternalRowPartitionComputer partitionComputer) { @@ -247,6 +334,10 @@ public class IncrementalClusterManager { return clusterKeys; } + public Snapshot snapshot() { + return snapshot; + } + @VisibleForTesting HistoryPartitionCluster historyPartitionCluster() { return historyPartitionCluster; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 572e8e17ee..09de891a37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -661,9 +661,6 @@ public class SchemaValidation { schema.primaryKeys().isEmpty(), "Cannot define %s for incremental clustering table.", PRIMARY_KEY.key()); - checkArgument( - !options.deletionVectorsEnabled(), - "Cannot enable deletion vectors mode for incremental clustering table."); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 4b62caab88..a4d1071092 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -134,12 +134,7 @@ public class IncrementalClusterManagerTest { } @Test - public void testUpgrade() throws Exception { - // Create a valid table for IncrementalClusterManager - Map<String, String> options = new HashMap<>(); - FileStoreTable table = createTable(options, Collections.emptyList()); - IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); - + public void testUpgrade() { // Create test files with different levels List<DataFileMeta> filesAfterCluster = new ArrayList<>(); DataFileMeta file1 = createFile(100, 1, 0); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 6fdd790651..4a914e8ac9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -23,6 +23,7 @@ import org.apache.paimon.append.cluster.IncrementalClusterManager; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; @@ -50,6 +51,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -69,6 +71,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -250,6 +253,12 @@ public class CompactAction extends TableActionBase { + "Please set '--force_start_flink_job true' if you need forcibly start a flink job."); return false; } + + Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers = + table.coreOptions().deletionVectorsEnabled() + ? IncrementalClusterManager.createAppendDvMaintainers( + table, compactUnits.keySet(), incrementalClusterManager.snapshot()) + : Collections.emptyMap(); Map<BinaryRow, DataSplit[]> partitionSplits = compactUnits.entrySet().stream() .collect( @@ -259,7 +268,9 @@ public class CompactAction extends TableActionBase { incrementalClusterManager .toSplits( entry.getKey(), - entry.getValue().files()) + entry.getValue().files(), + appendDvMaintainers.get( + entry.getKey())) .toArray(new DataSplit[0]))); // 2. read,sort and write in partition @@ -269,13 +280,22 @@ public class CompactAction extends TableActionBase { DataSplit[] splits = entry.getValue(); LinkedHashMap<String, String> partitionSpec = partitionComputer.generatePartValues(entry.getKey()); + // 2.1 generate source for current partition + List<CommitMessage> partitionDvIndexCommitMessages = + appendDvMaintainers.get(entry.getKey()) == null + ? Collections.emptyList() + : IncrementalClusterManager.producePartitionDvIndexCommitMessages( + table, + Arrays.asList(splits), + appendDvMaintainers.get(entry.getKey())); Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = IncrementalClusterSplitSource.buildSource( env, table, partitionSpec, splits, + partitionDvIndexCommitMessages, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); // 2.2 cluster in partition diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java index 6d18181242..957c70fe60 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.operator.ReadOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Pair; @@ -46,6 +47,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; /** Source for Incremental Clustering. */ @@ -86,6 +88,7 @@ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource< FileStoreTable table, Map<String, String> partitionSpec, DataSplit[] splits, + List<CommitMessage> partitionDvIndexCommitMessages, @Nullable Integer parallelism) { DataStream<Split> source = env.fromSource( @@ -117,7 +120,8 @@ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource< .transform( "Remove files to be clustered", new CommittableTypeInfo(), - new RemoveClusterBeforeFilesOperator()) + new RemoveClusterBeforeFilesOperator( + partitionDvIndexCommitMessages)) .forceNonParallel()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java index 907a69ba12..4c0d9f9941 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; @@ -29,12 +30,19 @@ import org.apache.paimon.table.source.Split; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collections; +import java.util.List; /** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator<Split, Committable> { private static final long serialVersionUID = 1L; + private final List<CommitMessage> partitionDvIndexCommitMessages; + + public RemoveClusterBeforeFilesOperator(List<CommitMessage> partitionDvIndexCommitMessages) { + this.partitionDvIndexCommitMessages = partitionDvIndexCommitMessages; + } + @Override public void processElement(StreamRecord<Split> element) throws Exception { DataSplit dataSplit = (DataSplit) element.getValue(); @@ -54,5 +62,15 @@ public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator<Sp } @Override - public void endInput() throws Exception {} + public void endInput() throws Exception { + emitDvIndexCommitMessages(Long.MAX_VALUE); + } + + private void emitDvIndexCommitMessages(long checkpointId) { + for (CommitMessage commitMessage : partitionDvIndexCommitMessages) { + output.collect( + new StreamRecord<>( + new Committable(checkpointId, Committable.Kind.FILE, commitMessage))); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 48e0f1fa30..bf7087e77e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -21,15 +21,25 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; +import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; +import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; @@ -565,6 +575,112 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); } + @Test + public void testClusterWithDeletionVector() throws Exception { + Map<String, String> dynamicOptions = new HashMap<>(); + dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + FileStoreTable table = createTable(null, 1, dynamicOptions); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List<CommitMessage> messages = new ArrayList<>(); + // first write + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, 0))); + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + + // first cluster + runAction(Collections.emptyList()); + + // second write + messages.clear(); + messages.addAll( + write( + GenericRow.of(0, 3, null, 0), + GenericRow.of(1, 3, null, 0), + GenericRow.of(2, 3, null, 0))); + messages.addAll( + write( + GenericRow.of(3, 0, null, 0), + GenericRow.of(3, 1, null, 0), + GenericRow.of(3, 2, null, 0), + GenericRow.of(3, 3, null, 0))); + commit(messages); + + // write deletion vector for the table + AppendDeleteFileMaintainer maintainer = + BaseAppendDeleteFileMaintainer.forUnawareAppend( + table.store().newIndexFileHandler(), + table.latestSnapshot().get(), + BinaryRow.EMPTY_ROW); + List<DataFileMeta> files = + readBuilder.newScan().plan().splits().stream() + .map(s -> ((DataSplit) s).dataFiles()) + .flatMap(List::stream) + .collect(Collectors.toList()); + // delete (0,0) and (0,3) + for (DataFileMeta file : files) { + if (file.rowCount() == 9 || file.rowCount() == 3) { + BitmapDeletionVector dv = new BitmapDeletionVector(); + dv.delete(0); + maintainer.notifyNewDeletionVector(file.fileName(), dv); + } + } + commit(produceDvIndexMessages(table, maintainer)); + List<String> result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List<String> expected1 = + Lists.newArrayList( + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[3, 2]", + "+I[3, 3]"); + assertThat(result1).containsExactlyElementsOf(expected1); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List<Split> splits = readBuilder.newScan().plan().splits(); + List<String> result2 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + assertThat(result2.size()).isEqualTo(expected1.size()); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + // dv index for level-5 file should be retained + assertThat(splits.get(0).deletionFiles().get().get(0)).isNotNull(); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat((splits.get(0).deletionFiles().get().get(1))).isNull(); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(splits.get(0).deletionFiles().get().get(0)).isNull(); + } + protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) throws Exception { return createTable(partitionKeys, sinkParallelism, Collections.emptyMap()); @@ -646,6 +762,32 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { .isEqualTo(Snapshot.CommitKind.COMPACT); } + private List<CommitMessage> produceDvIndexMessages( + FileStoreTable table, AppendDeleteFileMaintainer maintainer) { + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); + List<IndexManifestEntry> indexEntries = maintainer.persist(); + for (IndexManifestEntry entry : indexEntries) { + if (entry.kind() == FileKind.ADD) { + newIndexFiles.add(entry.indexFile()); + } else { + deletedIndexFiles.add(entry.indexFile()); + } + } + return Collections.singletonList( + new CommitMessageImpl( + maintainer.getPartition(), + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles))); + } + private void runAction(List<String> extra) throws Exception { StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); ArrayList<String> baseArgs = diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7cd20be759..45da3d2ce0 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -26,7 +26,9 @@ import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.append.cluster.IncrementalClusterManager; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -556,6 +558,12 @@ public class CompactProcedure extends BaseProcedure { Map<BinaryRow, CompactUnit> compactUnits = incrementalClusterManager.prepareForCluster(fullCompaction); + Map<BinaryRow, AppendDeleteFileMaintainer> appendDvMaintainers = + table.coreOptions().deletionVectorsEnabled() + ? IncrementalClusterManager.createAppendDvMaintainers( + table, compactUnits.keySet(), incrementalClusterManager.snapshot()) + : Collections.emptyMap(); + // generate splits for each partition Map<BinaryRow, DataSplit[]> partitionSplits = compactUnits.entrySet().stream() @@ -566,7 +574,9 @@ public class CompactProcedure extends BaseProcedure { incrementalClusterManager .toSplits( entry.getKey(), - entry.getValue().files()) + entry.getValue().files(), + appendDvMaintainers.get( + entry.getKey())) .toArray(new DataSplit[0]))); // sort in partition @@ -623,8 +633,33 @@ public class CompactProcedure extends BaseProcedure { "Partition {}: upgrade file level to {}", partition, compactUnits.get(partition).outputLevel()); + // get the dv index messages + List<CommitMessage> partitionDvIndexCommitMessages = + appendDvMaintainers.get(entry.getKey()) == null + ? Collections.emptyList() + : IncrementalClusterManager.producePartitionDvIndexCommitMessages( + table, + Arrays.asList(partitionSplits.get(partition)), + appendDvMaintainers.get(entry.getKey())); + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); + for (CommitMessage dvCommitMessage : partitionDvIndexCommitMessages) { + newIndexFiles.addAll( + ((CommitMessageImpl) dvCommitMessage) + .compactIncrement() + .newIndexFiles()); + deletedIndexFiles.addAll( + ((CommitMessageImpl) dvCommitMessage) + .compactIncrement() + .deletedIndexFiles()); + } CompactIncrement compactIncrement = - new CompactIncrement(clusterBefore, clusterAfter, Collections.emptyList()); + new CompactIncrement( + clusterBefore, + clusterAfter, + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles); clusterMessages.add( new CommitMessageImpl( partition, diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 5f653695d0..74f80befed 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest import org.assertj.core.api.Assertions +import org.scalatest.time.Span import java.util @@ -1137,6 +1138,141 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } } + test("Paimon Procedure: cluster with deletion vectors") { + failAfter(Span(5, org.scalatest.time.Minutes)) { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (a INT, b INT, c STRING) + |TBLPROPERTIES ('bucket'='-1', 'deletion-vectors.enabled'='true','num-levels'='6', 'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 'clustering.strategy'='zorder', 'clustering.incremental' = 'true') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b", "c") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + val random = new Random() + val randomStr = random.nextString(40) + // first write + inputData.addData((0, 0, randomStr)) + inputData.addData((0, 1, randomStr)) + inputData.addData((0, 2, randomStr)) + inputData.addData((1, 0, randomStr)) + inputData.addData((1, 1, randomStr)) + inputData.addData((1, 2, randomStr)) + inputData.addData((2, 0, randomStr)) + inputData.addData((2, 1, randomStr)) + inputData.addData((2, 2, randomStr)) + stream.processAllAvailable() + + val result = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result.add(Row(a, b, randomStr)) + } + } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + + // first cluster, the outputLevel should be 5 + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + + // first cluster result + val result2 = new util.ArrayList[Row]() + result2.add(0, Row(0, 0, randomStr)) + result2.add(1, Row(0, 1, randomStr)) + result2.add(2, Row(1, 0, randomStr)) + result2.add(3, Row(1, 1, randomStr)) + result2.add(4, Row(0, 2, randomStr)) + result2.add(5, Row(1, 2, randomStr)) + result2.add(6, Row(2, 0, randomStr)) + result2.add(7, Row(2, 1, randomStr)) + result2.add(8, Row(2, 2, randomStr)) + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + + var clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5) + + // second write + inputData.addData((0, 3, null), (1, 3, null), (2, 3, null)) + inputData.addData((3, 0, null), (3, 1, null), (3, 2, null), (3, 3, null)) + stream.processAllAvailable() + + // delete (0,0), which is in level-5 file + spark.sql("DELETE FROM T WHERE a=0 and b=0;").collect() + // delete (0,3), which is in level-0 file + spark.sql("DELETE FROM T WHERE a=0 and b=3;").collect() + + val result3 = new util.ArrayList[Row]() + result3.addAll(result2.subList(1, result2.size())) + for (a <- 1 until 3) { + result3.add(Row(a, 3, null)) + } + for (b <- 0 until 4) { + result3.add(Row(3, b, null)) + } + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + + // second cluster, the outputLevel should be 4. dv index for level-0 will be updated + // and dv index for level-5 will be retained + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + // second cluster result, level-5 and level-4 are individually ordered + val result4 = new util.ArrayList[Row]() + result4.addAll(result2.subList(1, result2.size())) + result4.add(Row(1, 3, null)) + result4.add(Row(3, 0, null)) + result4.add(Row(3, 1, null)) + result4.add(Row(2, 3, null)) + result4.add(Row(3, 2, null)) + result4.add(Row(3, 3, null)) + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + + clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(2) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5) + Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(0)).isNotNull + Assertions.assertThat(dataSplits.get(0).dataFiles().get(1).level()).isEqualTo(4) + Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(1)).isNull() + + // full cluster + checkAnswer( + spark.sql("CALL paimon.sys.compact(table => 'T', compact_strategy => 'full')"), + Row(true) :: Nil) + clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).deletionFiles().get().get(0)).isNull() + + } finally { + stream.stop() + } + } + } + } + def checkSnapshot(table: FileStoreTable): Unit = { Assertions .assertThat(table.latestSnapshot().get().commitKind().toString)
