This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa040c8813 [core] Refactor HistoryPartitionCluster to load less
history partitions
fa040c8813 is described below
commit fa040c8813ab7dd59df77bed0a78ceb6910440f5
Author: JingsongLi <[email protected]>
AuthorDate: Tue Oct 28 21:48:59 2025 +0800
[core] Refactor HistoryPartitionCluster to load less history partitions
---
.../append/cluster/HistoryPartitionCluster.java | 139 ++++++++++-----------
.../append/cluster/IncrementalClusterManager.java | 91 +++++---------
.../paimon/operation/AbstractFileStoreScan.java | 4 +-
.../org/apache/paimon/operation/FileStoreScan.java | 2 +-
.../apache/paimon/operation/ManifestsReader.java | 8 +-
.../table/source/snapshot/SnapshotReader.java | 2 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 5 +-
.../apache/paimon/table/system/AuditLogTable.java | 5 +-
.../cluster/HistoryPartitionClusterTest.java | 57 +--------
.../cluster/IncrementalClusterManagerTest.java | 2 +-
10 files changed, 110 insertions(+), 205 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
index cccecdd2be..83f92aa9c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
@@ -27,7 +27,6 @@ import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.slf4j.Logger;
@@ -39,14 +38,11 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels;
@@ -56,70 +52,86 @@ import static
org.apache.paimon.append.cluster.IncrementalClusterManager.logForP
public class HistoryPartitionCluster {
private static final Logger LOG =
LoggerFactory.getLogger(HistoryPartitionCluster.class);
- private final InternalRowPartitionComputer partitionComputer;
private final FileStoreTable table;
private final IncrementalClusterStrategy incrementalClusterStrategy;
- private final int maxLevel;
-
+ private final InternalRowPartitionComputer partitionComputer;
+ private final PartitionPredicate specifiedPartitions;
+ private final Duration historyPartitionIdleTime;
private final int historyPartitionLimit;
- @Nullable private final PartitionPredicate specifiedPartitions;
- @Nullable private final Duration historyPartitionIdleTime;
- @Nullable private final BiFilter<Integer, Integer> partitionLevelFilter;
+ private final int maxLevel;
public HistoryPartitionCluster(
FileStoreTable table,
IncrementalClusterStrategy incrementalClusterStrategy,
InternalRowPartitionComputer partitionComputer,
- int maxLevel,
- int historyPartitionLimit,
- @Nullable PartitionPredicate specifiedPartitions,
- @Nullable Duration historyPartitionIdleTime) {
+ PartitionPredicate specifiedPartitions,
+ Duration historyPartitionIdleTime,
+ int historyPartitionLimit) {
this.table = table;
this.incrementalClusterStrategy = incrementalClusterStrategy;
this.partitionComputer = partitionComputer;
- this.maxLevel = maxLevel;
- this.historyPartitionLimit = historyPartitionLimit;
this.specifiedPartitions = specifiedPartitions;
this.historyPartitionIdleTime = historyPartitionIdleTime;
- // (maxLevel + 1) / 2 is used to calculate the ceiling of maxLevel
divided by 2
- this.partitionLevelFilter =
- (partitionMinLevel, partitionMaxLevel) -> partitionMinLevel <
(maxLevel + 1) / 2;
+ this.historyPartitionLimit = historyPartitionLimit;
+ this.maxLevel = table.coreOptions().numLevels() - 1;
+ }
+
+ @Nullable
+ public static HistoryPartitionCluster create(
+ FileStoreTable table,
+ IncrementalClusterStrategy incrementalClusterStrategy,
+ InternalRowPartitionComputer partitionComputer,
+ @Nullable PartitionPredicate specifiedPartitions) {
+ if (table.schema().partitionKeys().isEmpty()) {
+ return null;
+ }
+ if (specifiedPartitions == null) {
+ return null;
+ }
+
+ Duration idleTime =
table.coreOptions().clusteringHistoryPartitionIdleTime();
+ if (idleTime == null) {
+ return null;
+ }
+
+ int limit = table.coreOptions().clusteringHistoryPartitionLimit();
+ return new HistoryPartitionCluster(
+ table,
+ incrementalClusterStrategy,
+ partitionComputer,
+ specifiedPartitions,
+ idleTime,
+ limit);
}
- public Map<BinaryRow, Optional<CompactUnit>> pickForHistoryPartitions() {
+ public Map<BinaryRow, CompactUnit> pickForHistoryPartitions() {
Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
constructLevelsForHistoryPartitions();
logForPartitionLevel(partitionLevels, partitionComputer);
- return partitionLevels.entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- entry ->
- incrementalClusterStrategy.pick(
- maxLevel, entry.getValue(),
true)));
+ Map<BinaryRow, CompactUnit> units = new HashMap<>();
+ partitionLevels.forEach(
+ (k, v) -> {
+ Optional<CompactUnit> pick =
+ incrementalClusterStrategy.pick(maxLevel + 1, v,
true);
+ pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+ });
+ return units;
}
@VisibleForTesting
public Map<BinaryRow, List<LevelSortedRun>>
constructLevelsForHistoryPartitions() {
- if (specifiedPartitions == null
- || historyPartitionIdleTime == null
- || historyPartitionLimit <= 0) {
- return Collections.emptyMap();
- }
-
long historyMilli =
LocalDateTime.now()
.minus(historyPartitionIdleTime)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
- // read partitionEntries filter by partitionLevelFilter
historyPartitionIdleTime
- // sort partitionEntries by lastFileCreation time, and we will pick
the oldest N partitions
+
List<BinaryRow> historyPartitions =
-
table.newSnapshotReader().withManifestLevelFilter(partitionLevelFilter)
- .partitionEntries().stream()
+ table.newSnapshotReader().withLevelMinMaxFilter((min, max) ->
min < maxLevel)
+ .withLevelFilter(level -> level <
maxLevel).partitionEntries().stream()
.filter(entry -> entry.lastFileCreationTime() <
historyMilli)
.sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime))
.map(PartitionEntry::partition)
@@ -131,6 +143,7 @@ public class HistoryPartitionCluster {
.withPartitionFilter(historyPartitions)
.read()
.dataSplits();
+
Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new
HashMap<>();
for (DataSplit dataSplit : historyDataSplits) {
historyPartitionFiles
@@ -138,52 +151,34 @@ public class HistoryPartitionCluster {
.addAll(dataSplit.dataFiles());
}
- // find history partitions which have low-level files
- Set<BinaryRow> selectedHistoryPartitions =
- findLowLevelPartitions(historyPartitions,
historyPartitionFiles);
- historyPartitionFiles =
- historyPartitionFiles.entrySet().stream()
- .filter(entry ->
selectedHistoryPartitions.contains(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
-
- return historyPartitionFiles.entrySet().stream()
+ return filterPartitions(historyPartitionFiles).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
constructPartitionLevels(entry.getValue())));
}
- @VisibleForTesting
- protected Set<BinaryRow> findLowLevelPartitions(
- List<BinaryRow> historyPartitions, Map<BinaryRow,
List<DataFileMeta>> partitionFiles) {
- Set<BinaryRow> partitions = new HashSet<>();
- // 1. the partition is not specified in specifiedPartitions
- // 2. the min file level in partition should be less than
Math.ceil(maxLevel/2)
- for (BinaryRow historyPartition : historyPartitions) {
- if (specifiedPartitions != null &&
!specifiedPartitions.test(historyPartition)) {
- List<DataFileMeta> files =
- partitionFiles.getOrDefault(historyPartition,
Collections.emptyList());
- if (!files.isEmpty()) {
- int partitionMinLevel = maxLevel + 1;
- for (DataFileMeta file : files) {
- partitionMinLevel = Math.min(partitionMinLevel,
file.level());
+ private Map<BinaryRow, List<DataFileMeta>> filterPartitions(
+ Map<BinaryRow, List<DataFileMeta>> partitionFiles) {
+ Map<BinaryRow, List<DataFileMeta>> result = new HashMap<>();
+ partitionFiles.forEach(
+ (part, files) -> {
+ if (specifiedPartitions.test(part)) {
+ // already contain in specified partitions
+ return;
}
- if (partitionLevelFilter != null
- && partitionLevelFilter.test(partitionMinLevel,
maxLevel)) {
- partitions.add(historyPartition);
- if (partitions.size() >= historyPartitionLimit) {
- break;
- }
+
+ if (result.size() < historyPartitionLimit) {
+ // in limit, can be picked
+ result.put(part, files);
}
- }
- }
- }
+ });
LOG.info(
"Find {} history partitions for full clustering, the history
partitions are {}",
- partitions.size(),
- partitions.stream()
+ result.size(),
+ result.keySet().stream()
.map(partitionComputer::generatePartValues)
.collect(Collectors.toSet()));
- return partitions;
+ return result;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 8142f9dbf0..6865b4b82e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -19,6 +19,7 @@
package org.apache.paimon.append.cluster;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
@@ -52,17 +53,14 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class IncrementalClusterManager {
private static final Logger LOG =
LoggerFactory.getLogger(IncrementalClusterManager.class);
- private final InternalRowPartitionComputer partitionComputer;
+ private final InternalRowPartitionComputer partitionComputer;
private final SnapshotReader snapshotReader;
-
private final IncrementalClusterStrategy incrementalClusterStrategy;
private final CoreOptions.OrderType clusterCurve;
private final List<String> clusterKeys;
-
- private final HistoryPartitionCluster historyPartitionCluster;
-
- private int maxLevel;
+ private final int numLevels;
+ private final @Nullable HistoryPartitionCluster historyPartitionCluster;
public IncrementalClusterManager(FileStoreTable table) {
this(table, null);
@@ -78,7 +76,7 @@ public class IncrementalClusterManager {
options.clusteringIncrementalEnabled(),
"Only support incremental clustering when '%s' is true.",
CLUSTERING_INCREMENTAL.key());
- this.maxLevel = options.numLevels();
+ this.numLevels = options.numLevels();
this.partitionComputer =
new InternalRowPartitionComputer(
table.coreOptions().partitionDefaultName(),
@@ -96,16 +94,9 @@ public class IncrementalClusterManager {
options.numSortedRunCompactionTrigger());
this.clusterCurve =
options.clusteringStrategy(options.clusteringColumns().size());
this.clusterKeys = options.clusteringColumns();
-
this.historyPartitionCluster =
- new HistoryPartitionCluster(
- table,
- incrementalClusterStrategy,
- partitionComputer,
- maxLevel,
- options.clusteringHistoryPartitionLimit(),
- specifiedPartitions,
- options.clusteringHistoryPartitionIdleTime());
+ HistoryPartitionCluster.create(
+ table, incrementalClusterStrategy, partitionComputer,
specifiedPartitions);
}
public Map<BinaryRow, CompactUnit> prepareForCluster(boolean
fullCompaction) {
@@ -114,30 +105,20 @@ public class IncrementalClusterManager {
logForPartitionLevel(partitionLevels, partitionComputer);
// 2. pick files to be clustered for each partition
- Map<BinaryRow, Optional<CompactUnit>> units =
- partitionLevels.entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- entry ->
-
incrementalClusterStrategy.pick(
- maxLevel,
- entry.getValue(),
- fullCompaction)));
-
- Map<BinaryRow, Optional<CompactUnit>> historyUnits =
- historyPartitionCluster.pickForHistoryPartitions();
- units.putAll(historyUnits);
+ Map<BinaryRow, CompactUnit> units = new HashMap<>();
+ partitionLevels.forEach(
+ (k, v) -> {
+ Optional<CompactUnit> pick =
+ incrementalClusterStrategy.pick(numLevels, v,
fullCompaction);
+ pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+ });
+
+ if (historyPartitionCluster != null) {
+ units.putAll(historyPartitionCluster.pickForHistoryPartitions());
+ }
- // 3. filter out empty units
- Map<BinaryRow, CompactUnit> filteredUnits =
- units.entrySet().stream()
- .filter(entry -> entry.getValue().isPresent())
- .collect(
- Collectors.toMap(
- Map.Entry::getKey, entry ->
entry.getValue().get()));
if (LOG.isDebugEnabled()) {
- filteredUnits.forEach(
+ units.forEach(
(partition, compactUnit) -> {
String filesInfo =
compactUnit.files().stream()
@@ -157,24 +138,17 @@ public class IncrementalClusterManager {
filesInfo);
});
}
- return filteredUnits;
+ return units;
}
public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
-
- maxLevel =
- Math.max(
- maxLevel,
- dataSplits.stream()
- .flatMap(split ->
split.dataFiles().stream())
- .mapToInt(DataFileMeta::level)
- .max()
- .orElse(-1)
- + 1);
- checkArgument(maxLevel > 1, "Number of levels must be at least 2.");
-
- Map<BinaryRow, List<DataFileMeta>> partitionFiles =
getPartitionFiles(dataSplits);
+ Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
+ for (DataSplit dataSplit : dataSplits) {
+ partitionFiles
+ .computeIfAbsent(dataSplit.partition(), k -> new
ArrayList<>())
+ .addAll(dataSplit.dataFiles());
+ }
return partitionFiles.entrySet().stream()
.collect(
Collectors.toMap(
@@ -208,16 +182,6 @@ public class IncrementalClusterManager {
return partitionLevels;
}
- private Map<BinaryRow, List<DataFileMeta>>
getPartitionFiles(List<DataSplit> dataSplits) {
- Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
- for (DataSplit dataSplit : dataSplits) {
- partitionFiles
- .computeIfAbsent(dataSplit.partition(), k -> new
ArrayList<>())
- .addAll(dataSplit.dataFiles());
- }
- return partitionFiles;
- }
-
public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta>
files) {
List<DataSplit> splits = new ArrayList<>();
@@ -283,7 +247,8 @@ public class IncrementalClusterManager {
return clusterKeys;
}
- public HistoryPartitionCluster historyPartitionCluster() {
+ @VisibleForTesting
+ HistoryPartitionCluster historyPartitionCluster() {
return historyPartitionCluster;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 736bc41d7b..2c69b2ce80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -197,8 +197,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer>
manifestLevelFilter) {
- manifestsReader.withManifestLevelFilter(manifestLevelFilter);
+ public FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter) {
+ manifestsReader.withLevelMinMaxFilter(minMaxFilter);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 354c43e019..047b1c3f5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -75,7 +75,7 @@ public interface FileStoreScan {
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
- FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer>
manifestLevelFilter);
+ FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter);
FileStoreScan enableValueFilter();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index dbf5140583..b3b89e72aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -53,7 +53,7 @@ public class ManifestsReader {
@Nullable private Integer specifiedBucket = null;
@Nullable private Integer specifiedLevel = null;
@Nullable private PartitionPredicate partitionFilter = null;
- @Nullable private BiFilter<Integer, Integer> manifestLevelFilter = null;
+ @Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
public ManifestsReader(
RowType partitionType,
@@ -81,8 +81,8 @@ public class ManifestsReader {
return this;
}
- public ManifestsReader withManifestLevelFilter(BiFilter<Integer, Integer>
manifestLevelFilter) {
- this.manifestLevelFilter = manifestLevelFilter;
+ public ManifestsReader withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter) {
+ this.levelMinMaxFilter = minMaxFilter;
return this;
}
@@ -167,7 +167,7 @@ public class ManifestsReader {
&& (specifiedLevel < minLevel || specifiedLevel >
maxLevel)) {
return false;
}
- if (manifestLevelFilter != null &&
!manifestLevelFilter.test(minLevel, maxLevel)) {
+ if (levelMinMaxFilter != null && !levelMinMaxFilter.test(minLevel,
maxLevel)) {
return false;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 91505ffcf9..ed80f3f92a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -88,7 +88,7 @@ public interface SnapshotReader {
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
- SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer>
manifestLevelFilter);
+ SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter);
SnapshotReader enableValueFilter();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 22cd5cad6f..038f4fec18 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -262,8 +262,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
- public SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer>
manifestLevelFilter) {
- scan.withManifestLevelFilter(manifestLevelFilter);
+ @Override
+ public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter) {
+ scan.withLevelMinMaxFilter(minMaxFilter);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index f71eba944d..3fd6a8d8af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -361,9 +361,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public SnapshotReader withManifestLevelFilter(
- BiFilter<Integer, Integer> manifestLevelFilter) {
- wrapped.withManifestLevelFilter(manifestLevelFilter);
+ public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer>
minMaxFilter) {
+ wrapped.withLevelMinMaxFilter(minMaxFilter);
return this;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
index 4692b50fbb..f1f93ca989 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
@@ -24,8 +24,6 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
@@ -39,17 +37,12 @@ import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import static
org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce;
-import static
org.apache.paimon.append.cluster.IncrementalClusterStrategyTest.createFile;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link HistoryPartitionCluster}. */
@@ -57,53 +50,6 @@ public class HistoryPartitionClusterTest {
@TempDir java.nio.file.Path tempDir;
- @Test
- public void testFindLowLevelPartitions() throws Exception {
- FileStoreTable table = createTable(Collections.emptyMap(),
Collections.emptyList());
- long now =
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
- Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
-
- // specified partition, has low-level files
- BinaryRow partition1 = BinaryRow.singleColumn(1);
- PartitionEntry partitionEntry1 = new PartitionEntry(partition1, 0, 0,
0, now);
- partitionFiles.put(
- partition1, Lists.newArrayList(createFile(100, 1, 3),
createFile(100, 1, 5)));
- // has no low-level files
- BinaryRow partition2 = BinaryRow.singleColumn(2);
- PartitionEntry partitionEntry2 = new PartitionEntry(partition2, 0, 0,
0, now);
- partitionFiles.put(partition2, Lists.newArrayList(createFile(100, 1,
0)));
- // has low-level files
- BinaryRow partition3 = BinaryRow.singleColumn(3);
- PartitionEntry partitionEntry3 = new PartitionEntry(partition3, 0, 0,
0, now);
- partitionFiles.put(
- partition3, Lists.newArrayList(createFile(100, 1, 0),
createFile(100, 1, 2)));
- // has no low-level files
- BinaryRow partition4 = BinaryRow.singleColumn(4);
- PartitionEntry partitionEntry4 = new PartitionEntry(partition3, 0, 0,
0, now);
- partitionFiles.put(partition4, Lists.newArrayList(createFile(100, 1,
0)));
-
- IncrementalClusterManager incrementalClusterManager =
- new IncrementalClusterManager(
- table,
- PartitionPredicate.fromMultiple(
- RowType.of(DataTypes.INT()),
Lists.newArrayList(partition1)));
- HistoryPartitionCluster historyPartitionCluster =
- incrementalClusterManager.historyPartitionCluster();
- Set<BinaryRow> selectedPartitions =
- historyPartitionCluster.findLowLevelPartitions(
- Lists.newArrayList(
- partitionEntry1,
- partitionEntry2,
- partitionEntry3,
- partitionEntry4)
- .stream()
- .map(PartitionEntry::partition)
- .collect(Collectors.toList()),
- partitionFiles);
-
- assertThat(selectedPartitions).contains(partition2);
- }
-
@Test
public void testHistoryPartitionAutoClustering() throws Exception {
FileStoreTable table = createTable(Collections.emptyMap(),
Collections.singletonList("f2"));
@@ -153,8 +99,7 @@ public class HistoryPartitionClusterTest {
// test not specify partition and disable history partition auto
clustering
historyPartitionCluster = new
IncrementalClusterManager(table).historyPartitionCluster();
- partitionLevels =
historyPartitionCluster.constructLevelsForHistoryPartitions();
- assertThat(partitionLevels.isEmpty()).isTrue();
+ assertThat(historyPartitionCluster).isNull();
}
protected FileStoreTable createTable(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 37cc006673..4b62caab88 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -58,7 +58,7 @@ public class IncrementalClusterManagerTest {
@TempDir java.nio.file.Path tempDir;
@Test
- public void testNonUnAwareBucketTable() throws Exception {
+ public void testNonUnAwareBucketTable() {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.BUCKET.key(), "1");
options.put(CoreOptions.BUCKET_KEY.key(), "f0");