This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit de1f9c26b51ba4c342d30a869e25c215ecd7407e Author: LsomeYeah <[email protected]> AuthorDate: Tue Oct 28 21:09:42 2025 +0800 [core] support automatically clustering historical partition (#6472) --- .../shortcodes/generated/core_configuration.html | 12 ++ .../main/java/org/apache/paimon/CoreOptions.java | 24 +++ .../append/cluster/HistoryPartitionCluster.java | 189 ++++++++++++++++++++ .../append/cluster/IncrementalClusterManager.java | 101 +++++++---- .../paimon/operation/AbstractFileStoreScan.java | 6 + .../org/apache/paimon/operation/FileStoreScan.java | 2 + .../apache/paimon/operation/ManifestsReader.java | 10 ++ .../table/source/snapshot/SnapshotReader.java | 3 + .../table/source/snapshot/SnapshotReaderImpl.java | 6 + .../apache/paimon/table/system/AuditLogTable.java | 8 + .../cluster/HistoryPartitionClusterTest.java | 190 +++++++++++++++++++++ .../cluster/IncrementalClusterManagerTest.java | 88 +++++++++- .../cluster/IncrementalClusterStrategyTest.java | 2 +- .../action/IncrementalClusterActionITCase.java | 173 ++++++++++++++++++- 14 files changed, 777 insertions(+), 37 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 3411e2bd92..2d4abf9787 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -146,6 +146,18 @@ under the License. <td>String</td> <td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for append table without primary keys and batch execution mode.</td> </tr> + <tr> + <td><h5>clustering.history-partition.idle-to-full-sort</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The duration after which a partition without new updates is considered a historical partition. Historical partitions will be automatically fully clustered during the cluster operation.This option takes effects when 'clustering.history-partition.auto.enabled' is true.</td> + </tr> + <tr> + <td><h5>clustering.history-partition.limit</h5></td> + <td style="word-wrap: break-word;">5</td> + <td>Integer</td> + <td>The limit of history partition number for automatically performing full clustering.</td> + </tr> <tr> <td><h5>clustering.incremental</h5></td> <td style="word-wrap: break-word;">false</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5209e54db..a3da8ef5f1 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1939,6 +1939,22 @@ public class CoreOptions implements Serializable { .defaultValue(false) .withDescription("Whether enable incremental clustering."); + public static final ConfigOption<Integer> CLUSTERING_HISTORY_PARTITION_LIMIT = + key("clustering.history-partition.limit") + .intType() + .defaultValue(5) + .withDescription( + "The limit of history partition number for automatically performing full clustering."); + + public static final ConfigOption<Duration> CLUSTERING_HISTORY_PARTITION_IDLE_TIME = + key("clustering.history-partition.idle-to-full-sort") + .durationType() + .noDefaultValue() + .withDescription( + "The duration after which a partition without new updates is considered a historical partition. " + + "Historical partitions will be automatically fully clustered during the cluster operation." + + "This option takes effects when 'clustering.history-partition.auto.enabled' is true."); + public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED = key("row-tracking.enabled") .booleanType() @@ -3006,6 +3022,14 @@ public class CoreOptions implements Serializable { return options.get(CLUSTERING_INCREMENTAL); } + public Duration clusteringHistoryPartitionIdleTime() { + return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME); + } + + public int clusteringHistoryPartitionLimit() { + return options.get(CLUSTERING_HISTORY_PARTITION_LIMIT); + } + public OrderType clusteringStrategy(int columnSize) { return clusteringStrategy(options.get(CLUSTERING_STRATEGY), columnSize); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java new file mode 100644 index 0000000000..cccecdd2be --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.BiFilter; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels; +import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForPartitionLevel; + +/** Handle historical partition for full clustering. */ +public class HistoryPartitionCluster { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryPartitionCluster.class); + private final InternalRowPartitionComputer partitionComputer; + + private final FileStoreTable table; + private final IncrementalClusterStrategy incrementalClusterStrategy; + private final int maxLevel; + + private final int historyPartitionLimit; + @Nullable private final PartitionPredicate specifiedPartitions; + @Nullable private final Duration historyPartitionIdleTime; + @Nullable private final BiFilter<Integer, Integer> partitionLevelFilter; + + public HistoryPartitionCluster( + FileStoreTable table, + IncrementalClusterStrategy incrementalClusterStrategy, + InternalRowPartitionComputer partitionComputer, + int maxLevel, + int historyPartitionLimit, + @Nullable PartitionPredicate specifiedPartitions, + @Nullable Duration historyPartitionIdleTime) { + this.table = table; + this.incrementalClusterStrategy = incrementalClusterStrategy; + this.partitionComputer = partitionComputer; + this.maxLevel = maxLevel; + this.historyPartitionLimit = historyPartitionLimit; + this.specifiedPartitions = specifiedPartitions; + this.historyPartitionIdleTime = historyPartitionIdleTime; + // (maxLevel + 1) / 2 is used to calculate the ceiling of maxLevel divided by 2 + this.partitionLevelFilter = + (partitionMinLevel, partitionMaxLevel) -> partitionMinLevel < (maxLevel + 1) / 2; + } + + public Map<BinaryRow, Optional<CompactUnit>> pickForHistoryPartitions() { + Map<BinaryRow, List<LevelSortedRun>> partitionLevels = + constructLevelsForHistoryPartitions(); + logForPartitionLevel(partitionLevels, partitionComputer); + + return partitionLevels.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterStrategy.pick( + maxLevel, entry.getValue(), true))); + } + + @VisibleForTesting + public Map<BinaryRow, List<LevelSortedRun>> constructLevelsForHistoryPartitions() { + if (specifiedPartitions == null + || historyPartitionIdleTime == null + || historyPartitionLimit <= 0) { + return Collections.emptyMap(); + } + + long historyMilli = + LocalDateTime.now() + .minus(historyPartitionIdleTime) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + // read partitionEntries filter by partitionLevelFilter historyPartitionIdleTime + // sort partitionEntries by lastFileCreation time, and we will pick the oldest N partitions + List<BinaryRow> historyPartitions = + table.newSnapshotReader().withManifestLevelFilter(partitionLevelFilter) + .partitionEntries().stream() + .filter(entry -> entry.lastFileCreationTime() < historyMilli) + .sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime)) + .map(PartitionEntry::partition) + .collect(Collectors.toList()); + + // read dataFileMeta for history partitions + List<DataSplit> historyDataSplits = + table.newSnapshotReader() + .withPartitionFilter(historyPartitions) + .read() + .dataSplits(); + Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new HashMap<>(); + for (DataSplit dataSplit : historyDataSplits) { + historyPartitionFiles + .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) + .addAll(dataSplit.dataFiles()); + } + + // find history partitions which have low-level files + Set<BinaryRow> selectedHistoryPartitions = + findLowLevelPartitions(historyPartitions, historyPartitionFiles); + historyPartitionFiles = + historyPartitionFiles.entrySet().stream() + .filter(entry -> selectedHistoryPartitions.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return historyPartitionFiles.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> constructPartitionLevels(entry.getValue()))); + } + + @VisibleForTesting + protected Set<BinaryRow> findLowLevelPartitions( + List<BinaryRow> historyPartitions, Map<BinaryRow, List<DataFileMeta>> partitionFiles) { + Set<BinaryRow> partitions = new HashSet<>(); + // 1. the partition is not specified in specifiedPartitions + // 2. the min file level in partition should be less than Math.ceil(maxLevel/2) + for (BinaryRow historyPartition : historyPartitions) { + if (specifiedPartitions != null && !specifiedPartitions.test(historyPartition)) { + List<DataFileMeta> files = + partitionFiles.getOrDefault(historyPartition, Collections.emptyList()); + if (!files.isEmpty()) { + int partitionMinLevel = maxLevel + 1; + for (DataFileMeta file : files) { + partitionMinLevel = Math.min(partitionMinLevel, file.level()); + } + if (partitionLevelFilter != null + && partitionLevelFilter.test(partitionMinLevel, maxLevel)) { + partitions.add(historyPartition); + if (partitions.size() >= historyPartitionLimit) { + break; + } + } + } + } + } + LOG.info( + "Find {} history partitions for full clustering, the history partitions are {}", + partitions.size(), + partitions.stream() + .map(partitionComputer::generatePartValues) + .collect(Collectors.toSet())); + return partitions; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 1aa4d2b13e..8142f9dbf0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; public class IncrementalClusterManager { private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class); + private final InternalRowPartitionComputer partitionComputer; private final SnapshotReader snapshotReader; @@ -58,6 +60,8 @@ public class IncrementalClusterManager { private final CoreOptions.OrderType clusterCurve; private final List<String> clusterKeys; + private final HistoryPartitionCluster historyPartitionCluster; + private int maxLevel; public IncrementalClusterManager(FileStoreTable table) { @@ -69,14 +73,20 @@ public class IncrementalClusterManager { checkArgument( table.bucketMode() == BucketMode.BUCKET_UNAWARE, "only append unaware-bucket table support incremental clustering."); - // drop stats to reduce memory usage - this.snapshotReader = - table.newSnapshotReader().withPartitionFilter(specifiedPartitions).dropStats(); CoreOptions options = table.coreOptions(); checkArgument( options.clusteringIncrementalEnabled(), "Only support incremental clustering when '%s' is true.", CLUSTERING_INCREMENTAL.key()); + this.maxLevel = options.numLevels(); + this.partitionComputer = + new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.store().partitionType(), + table.partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + this.snapshotReader = + table.newSnapshotReader().dropStats().withPartitionFilter(specifiedPartitions); this.incrementalClusterStrategy = new IncrementalClusterStrategy( table.schemaManager(), @@ -86,31 +96,22 @@ public class IncrementalClusterManager { options.numSortedRunCompactionTrigger()); this.clusterCurve = options.clusteringStrategy(options.clusteringColumns().size()); this.clusterKeys = options.clusteringColumns(); - this.maxLevel = options.numLevels(); + + this.historyPartitionCluster = + new HistoryPartitionCluster( + table, + incrementalClusterStrategy, + partitionComputer, + maxLevel, + options.clusteringHistoryPartitionLimit(), + specifiedPartitions, + options.clusteringHistoryPartitionIdleTime()); } public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) { // 1. construct LSM structure for each partition Map<BinaryRow, List<LevelSortedRun>> partitionLevels = constructLevels(); - if (LOG.isDebugEnabled()) { - partitionLevels.forEach( - (partition, levelSortedRuns) -> { - String runsInfo = - levelSortedRuns.stream() - .map( - lsr -> - String.format( - "level-%s:%s", - lsr.level(), - lsr.run().files().size())) - .collect(Collectors.joining(",")); - LOG.debug( - "Partition {} has {} runs: [{}]", - partition, - levelSortedRuns.size(), - runsInfo); - }); - } + logForPartitionLevel(partitionLevels, partitionComputer); // 2. pick files to be clustered for each partition Map<BinaryRow, Optional<CompactUnit>> units = @@ -124,6 +125,10 @@ public class IncrementalClusterManager { entry.getValue(), fullCompaction))); + Map<BinaryRow, Optional<CompactUnit>> historyUnits = + historyPartitionCluster.pickForHistoryPartitions(); + units.putAll(historyUnits); + // 3. filter out empty units Map<BinaryRow, CompactUnit> filteredUnits = units.entrySet().stream() @@ -146,7 +151,7 @@ public class IncrementalClusterManager { .collect(Collectors.joining(", ")); LOG.debug( "Partition {}, outputLevel:{}, clustered with {} files: [{}]", - partition, + partitionComputer.generatePartValues(partition), compactUnit.outputLevel(), compactUnit.files().size(), filesInfo); @@ -169,13 +174,7 @@ public class IncrementalClusterManager { + 1); checkArgument(maxLevel > 1, "Number of levels must be at least 2."); - Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); - for (DataSplit dataSplit : dataSplits) { - partitionFiles - .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) - .addAll(dataSplit.dataFiles()); - } - + Map<BinaryRow, List<DataFileMeta>> partitionFiles = getPartitionFiles(dataSplits); return partitionFiles.entrySet().stream() .collect( Collectors.toMap( @@ -183,7 +182,7 @@ public class IncrementalClusterManager { entry -> constructPartitionLevels(entry.getValue()))); } - public List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> partitionFiles) { + public static List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> partitionFiles) { List<LevelSortedRun> partitionLevels = new ArrayList<>(); Map<Integer, List<DataFileMeta>> levelMap = partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level)); @@ -209,6 +208,16 @@ public class IncrementalClusterManager { return partitionLevels; } + private Map<BinaryRow, List<DataFileMeta>> getPartitionFiles(List<DataSplit> dataSplits) { + Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); + for (DataSplit dataSplit : dataSplits) { + partitionFiles + .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) + .addAll(dataSplit.dataFiles()); + } + return partitionFiles; + } + public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) { List<DataSplit> splits = new ArrayList<>(); @@ -242,6 +251,30 @@ public class IncrementalClusterManager { .collect(Collectors.toList()); } + public static void logForPartitionLevel( + Map<BinaryRow, List<LevelSortedRun>> partitionLevels, + InternalRowPartitionComputer partitionComputer) { + if (LOG.isDebugEnabled()) { + partitionLevels.forEach( + (partition, levelSortedRuns) -> { + String runsInfo = + levelSortedRuns.stream() + .map( + lsr -> + String.format( + "level-%s:%s", + lsr.level(), + lsr.run().files().size())) + .collect(Collectors.joining(",")); + LOG.debug( + "Partition {} has {} runs: [{}]", + partitionComputer.generatePartValues(partition), + levelSortedRuns.size(), + runsInfo); + }); + } + } + public CoreOptions.OrderType clusterCurve() { return clusterCurve; } @@ -249,4 +282,8 @@ public class IncrementalClusterManager { public List<String> clusterKeys() { return clusterKeys; } + + public HistoryPartitionCluster historyPartitionCluster() { + return historyPartitionCluster; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index b21c77db1d..6f06514ed3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -196,6 +196,12 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { return this; } + @Override + public FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { + manifestsReader.withManifestLevelFilter(manifestLevelFilter); + return this; + } + @Override public FileStoreScan enableValueFilter() { return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 5fa3127237..354c43e019 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -75,6 +75,8 @@ public interface FileStoreScan { FileStoreScan withLevelFilter(Filter<Integer> levelFilter); + FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter); + FileStoreScan enableValueFilter(); FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index d58bb797e3..dbf5140583 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -27,6 +27,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.SnapshotManager; import javax.annotation.Nullable; @@ -52,6 +53,7 @@ public class ManifestsReader { @Nullable private Integer specifiedBucket = null; @Nullable private Integer specifiedLevel = null; @Nullable private PartitionPredicate partitionFilter = null; + @Nullable private BiFilter<Integer, Integer> manifestLevelFilter = null; public ManifestsReader( RowType partitionType, @@ -79,6 +81,11 @@ public class ManifestsReader { return this; } + public ManifestsReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { + this.manifestLevelFilter = manifestLevelFilter; + return this; + } + public ManifestsReader withPartitionFilter(Predicate predicate) { this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate); return this; @@ -160,6 +167,9 @@ public class ManifestsReader { && (specifiedLevel < minLevel || specifiedLevel > maxLevel)) { return false; } + if (manifestLevelFilter != null && !manifestLevelFilter.test(minLevel, maxLevel)) { + return false; + } } if (partitionFilter == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 82112f5fd4..91505ffcf9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; @@ -87,6 +88,8 @@ public interface SnapshotReader { SnapshotReader withLevelFilter(Filter<Integer> levelFilter); + SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter); + SnapshotReader enableValueFilter(); SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 81c015a9d4..7b1ca7b37e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -47,6 +47,7 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.SplitGenerator; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; @@ -254,6 +255,11 @@ public class SnapshotReaderImpl implements SnapshotReader { return this; } + public SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { + scan.withManifestLevelFilter(manifestLevelFilter); + return this; + } + @Override public SnapshotReader enableValueFilter() { scan.enableValueFilter(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index ab3b13a5ab..f71eba944d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -59,6 +59,7 @@ import org.apache.paimon.table.source.snapshot.StartingContext; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; @@ -359,6 +360,13 @@ public class AuditLogTable implements DataTable, ReadonlyTable { return this; } + @Override + public SnapshotReader withManifestLevelFilter( + BiFilter<Integer, Integer> manifestLevelFilter) { + wrapped.withManifestLevelFilter(manifestLevelFilter); + return this; + } + @Override public SnapshotReader enableValueFilter() { wrapped.enableValueFilter(); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java new file mode 100644 index 0000000000..4692b50fbb --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce; +import static org.apache.paimon.append.cluster.IncrementalClusterStrategyTest.createFile; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link HistoryPartitionCluster}. */ +public class HistoryPartitionClusterTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testFindLowLevelPartitions() throws Exception { + FileStoreTable table = createTable(Collections.emptyMap(), Collections.emptyList()); + long now = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); + + // specified partition, has low-level files + BinaryRow partition1 = BinaryRow.singleColumn(1); + PartitionEntry partitionEntry1 = new PartitionEntry(partition1, 0, 0, 0, now); + partitionFiles.put( + partition1, Lists.newArrayList(createFile(100, 1, 3), createFile(100, 1, 5))); + // has no low-level files + BinaryRow partition2 = BinaryRow.singleColumn(2); + PartitionEntry partitionEntry2 = new PartitionEntry(partition2, 0, 0, 0, now); + partitionFiles.put(partition2, Lists.newArrayList(createFile(100, 1, 0))); + // has low-level files + BinaryRow partition3 = BinaryRow.singleColumn(3); + PartitionEntry partitionEntry3 = new PartitionEntry(partition3, 0, 0, 0, now); + partitionFiles.put( + partition3, Lists.newArrayList(createFile(100, 1, 0), createFile(100, 1, 2))); + // has no low-level files + BinaryRow partition4 = BinaryRow.singleColumn(4); + PartitionEntry partitionEntry4 = new PartitionEntry(partition3, 0, 0, 0, now); + partitionFiles.put(partition4, Lists.newArrayList(createFile(100, 1, 0))); + + IncrementalClusterManager incrementalClusterManager = + new IncrementalClusterManager( + table, + PartitionPredicate.fromMultiple( + RowType.of(DataTypes.INT()), Lists.newArrayList(partition1))); + HistoryPartitionCluster historyPartitionCluster = + incrementalClusterManager.historyPartitionCluster(); + Set<BinaryRow> selectedPartitions = + historyPartitionCluster.findLowLevelPartitions( + Lists.newArrayList( + partitionEntry1, + partitionEntry2, + partitionEntry3, + partitionEntry4) + .stream() + .map(PartitionEntry::partition) + .collect(Collectors.toList()), + partitionFiles); + + assertThat(selectedPartitions).contains(partition2); + } + + @Test + public void testHistoryPartitionAutoClustering() throws Exception { + FileStoreTable table = createTable(Collections.emptyMap(), Collections.singletonList("f2")); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt1"), BinaryString.fromString("test"))); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt2"), BinaryString.fromString("test"))); + + Thread.sleep(2000); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt3"), BinaryString.fromString("test"))); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt4"), BinaryString.fromString("test"))); + + // test specify history partition and enable history partition auto clustering + HistoryPartitionCluster historyPartitionCluster = + new IncrementalClusterManager( + table, + PartitionPredicate.fromMultiple( + RowType.of(DataTypes.INT()), + Lists.newArrayList(BinaryRow.singleColumn("pt1")))) + .historyPartitionCluster(); + Map<BinaryRow, List<LevelSortedRun>> partitionLevels = + historyPartitionCluster.constructLevelsForHistoryPartitions(); + assertThat(partitionLevels.size()).isEqualTo(1); + assertThat(partitionLevels.get(BinaryRow.singleColumn("pt2"))).isNotEmpty(); + + // test specify non-history partition and enable history partition auto clustering + historyPartitionCluster = + new IncrementalClusterManager( + table, + PartitionPredicate.fromMultiple( + RowType.of(DataTypes.INT()), + Lists.newArrayList(BinaryRow.singleColumn("pt3")))) + .historyPartitionCluster(); + partitionLevels = historyPartitionCluster.constructLevelsForHistoryPartitions(); + assertThat(partitionLevels.size()).isEqualTo(1); + assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotEmpty(); + + // test not specify partition and disable history partition auto clustering + historyPartitionCluster = new IncrementalClusterManager(table).historyPartitionCluster(); + partitionLevels = historyPartitionCluster.constructLevelsForHistoryPartitions(); + assertThat(partitionLevels.isEmpty()).isTrue(); + } + + protected FileStoreTable createTable( + Map<String, String> customOptions, List<String> partitionKeys) throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "-1"); + options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "true"); + options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), "2s"); + options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_LIMIT.key(), "1"); + options.putAll(customOptions); + + Schema schema = + new Schema( + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING()) + .getFields(), + partitionKeys, + Collections.emptyList(), + options, + ""); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + return FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(tempDir.toString()), + schemaManager.createTable(schema)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 8b37aff0b8..37cc006673 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -19,18 +19,27 @@ package org.apache.paimon.append.cluster; import org.apache.paimon.CoreOptions; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -143,7 +152,7 @@ public class IncrementalClusterManagerTest { // Test upgrading to level 3 int outputLevel = 3; List<DataFileMeta> upgradedFiles = - incrementalClusterManager.upgrade(filesAfterCluster, outputLevel); + IncrementalClusterManager.upgrade(filesAfterCluster, outputLevel); // Verify the results assertThat(upgradedFiles).hasSize(3); @@ -154,7 +163,67 @@ public class IncrementalClusterManagerTest { } } - private FileStoreTable createTable( + @Test + public void testHistoryPartitionAutoClustering() throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), "2s"); + options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_LIMIT.key(), "1"); + + FileStoreTable table = createTable(options, Collections.singletonList("f2")); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt1"), BinaryString.fromString("test"))); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt2"), BinaryString.fromString("test"))); + + Thread.sleep(2000); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt3"), BinaryString.fromString("test"))); + writeOnce( + table, + GenericRow.of( + 1, 1, BinaryString.fromString("pt4"), BinaryString.fromString("test"))); + + // test specify partition and enable history partition auto clustering + IncrementalClusterManager incrementalClusterManager = + new IncrementalClusterManager( + table, + PartitionPredicate.fromMultiple( + RowType.of(DataTypes.INT()), + Lists.newArrayList(BinaryRow.singleColumn("pt3")))); + Map<BinaryRow, CompactUnit> partitionLevels = + incrementalClusterManager.prepareForCluster(true); + assertThat(partitionLevels.size()).isEqualTo(2); + assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull(); + assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull(); + + // test don't specify partition and enable history partition auto clustering + incrementalClusterManager = new IncrementalClusterManager(table); + partitionLevels = incrementalClusterManager.prepareForCluster(true); + assertThat(partitionLevels.size()).isEqualTo(4); + + // test specify partition and disable history partition auto clustering + SchemaChange schemaChange = + SchemaChange.removeOption(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key()); + incrementalClusterManager = + new IncrementalClusterManager( + table.copy( + table.schemaManager() + .commitChanges(Collections.singletonList(schemaChange))), + PartitionPredicate.fromMultiple( + RowType.of(DataTypes.INT()), + Lists.newArrayList(BinaryRow.singleColumn("pt3")))); + partitionLevels = incrementalClusterManager.prepareForCluster(true); + assertThat(partitionLevels.size()).isEqualTo(1); + assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull(); + } + + protected FileStoreTable createTable( Map<String, String> customOptions, List<String> partitionKeys) throws Exception { Map<String, String> options = new HashMap<>(); options.put(CoreOptions.BUCKET.key(), "-1"); @@ -183,7 +252,20 @@ public class IncrementalClusterManagerTest { schemaManager.createTable(schema)); } - private static DataFileMeta createFile(long size, long schemaId, int level) { + protected static void writeOnce(FileStoreTable table, GenericRow... rows) { + String commitUser = "test_user"; + try (BatchTableWrite write = table.newWrite(commitUser); + BatchTableCommit commit = table.newCommit(commitUser)) { + for (GenericRow row : rows) { + write.write(row); + } + commit.commit(write.prepareCommit()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static DataFileMeta createFile(long size, long schemaId, int level) { return DataFileMeta.create( "", size, diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java index 1061c50c9e..a00f2442ae 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java @@ -199,7 +199,7 @@ public class IncrementalClusterStrategyTest { SchemaChange.setOption(CoreOptions.CLUSTERING_COLUMNS.key(), "f2,f3")); } - private static DataFileMeta createFile(long size, long schemaId, int level) { + protected static DataFileMeta createFile(long size, long schemaId, int level) { return DataFileMeta.create( "", size, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 7629ebf81d..48e0f1fa30 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -18,10 +18,12 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; @@ -41,8 +43,11 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -361,6 +366,158 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { } } + @Test + public void testClusterHistoryPartition() throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), "3s"); + FileStoreTable table = createTable("pt", 1, options); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List<CommitMessage> messages = new ArrayList<>(); + + // first write + List<String> expected1 = new ArrayList<>(); + for (int pt = 0; pt < 4; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, pt))); + expected1.add(String.format("+I[%s, %s, %s]", i, j, pt)); + } + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); + List<String> result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster, files in four partitions will be in top level + runAction(Collections.emptyList()); + checkSnapshot(table); + List<Split> splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(4); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List<String> result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected2 = new ArrayList<>(); + for (int pt = 0; pt < 4; pt++) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + for (int pt = 0; pt < 4; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + // pt-0, pt-1 will be history partition + if (pt == 1) { + Thread.sleep(3000); + } + } + commit(messages); + + List<String> result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List<String> expected3 = new ArrayList<>(); + for (int pt = 0; pt < 4; pt++) { + expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Lists.newArrayList("--partition", "pt=3")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List<String> result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected4 = new ArrayList<>(); + assertThat(splits.size()).isEqualTo(4); + // for pt-0 and pt-1: history partition, full clustering, all files will be + // picked for clustering, outputLevel is 5. + for (int pt = 0; pt <= 1; pt++) { + expected4.add(String.format("+I[0, 0, %s]", pt)); + expected4.add(String.format("+I[0, 1, %s]", pt)); + expected4.add(String.format("+I[1, 0, %s]", pt)); + expected4.add(String.format("+I[1, 1, %s]", pt)); + expected4.add(String.format("+I[0, 2, %s]", pt)); + expected4.add(String.format("+I[0, 3, %s]", pt)); + expected4.add(String.format("+I[1, 2, %s]", pt)); + expected4.add(String.format("+I[1, 3, %s]", pt)); + expected4.add(String.format("+I[2, 0, %s]", pt)); + expected4.add(String.format("+I[2, 1, %s]", pt)); + expected4.add(String.format("+I[3, 0, %s]", pt)); + expected4.add(String.format("+I[3, 1, %s]", pt)); + expected4.add(String.format("+I[2, 2, %s]", pt)); + expected4.add(String.format("+I[2, 3, %s]", pt)); + expected4.add(String.format("+I[3, 2, %s]", pt)); + expected4.add(String.format("+I[3, 3, %s]", pt)); + // the table has enabled 'scan.plan-sort-partition', so the splits has been sorted by + // partition + assertThat(((DataSplit) splits.get(pt)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(pt)).dataFiles().get(0).level()).isEqualTo(5); + } + // for pt-2, non history partition, nor specified partition, nothing happened + expected4.addAll(expected3.subList(32, 48)); + assertThat(((DataSplit) splits.get(2)).dataFiles().size()).isEqualTo(3); + // for pt-3: minor clustering, only file in level-0 will be picked for clustering, + // outputLevel is 4 + expected4.add("+I[0, 0, 3]"); + expected4.add("+I[0, 1, 3]"); + expected4.add("+I[1, 0, 3]"); + expected4.add("+I[1, 1, 3]"); + expected4.add("+I[0, 2, 3]"); + expected4.add("+I[1, 2, 3]"); + expected4.add("+I[2, 0, 3]"); + expected4.add("+I[2, 1, 3]"); + expected4.add("+I[2, 2, 3]"); + expected4.add("+I[0, 3, 3]"); + expected4.add("+I[1, 3, 3]"); + expected4.add("+I[3, 0, 3]"); + expected4.add("+I[3, 1, 3]"); + expected4.add("+I[2, 3, 3]"); + expected4.add("+I[3, 2, 3]"); + expected4.add("+I[3, 3, 3]"); + assertThat(((DataSplit) splits.get(3)).dataFiles().size()).isEqualTo(2); + assertThat( + ((DataSplit) splits.get(3)) + .dataFiles().stream() + .map(DataFileMeta::level) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder(4, 5); + + assertThat(result4).containsExactlyElementsOf(expected4); + } + @Test public void testClusterOnEmptyData() throws Exception { createTable("pt", 1); @@ -410,8 +567,14 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) throws Exception { + return createTable(partitionKeys, sinkParallelism, Collections.emptyMap()); + } + + protected FileStoreTable createTable( + String partitionKeys, int sinkParallelism, Map<String, String> options) + throws Exception { catalog.createDatabase(database, true); - catalog.createTable(identifier(), schema(partitionKeys, sinkParallelism), true); + catalog.createTable(identifier(), schema(partitionKeys, sinkParallelism, options), true); return (FileStoreTable) catalog.getTable(identifier()); } @@ -440,6 +603,11 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { } private static Schema schema(String partitionKeys, int sinkParallelism) { + return schema(partitionKeys, sinkParallelism, Collections.emptyMap()); + } + + private static Schema schema( + String partitionKeys, int sinkParallelism, Map<String, String> options) { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("a", DataTypes.INT()); schemaBuilder.column("b", DataTypes.INT()); @@ -454,6 +622,9 @@ public class IncrementalClusterActionITCase extends ActionITCaseBase { schemaBuilder.option("clustering.incremental", "true"); schemaBuilder.option("scan.parallelism", "1"); schemaBuilder.option("sink.parallelism", String.valueOf(sinkParallelism)); + for (String key : options.keySet()) { + schemaBuilder.option(key, options.get(key)); + } if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) { schemaBuilder.partitionKeys(partitionKeys); }
