This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1055e4b99b [core] Refactor scan and plan in
DataEvolutionCompactCoordinator
1055e4b99b is described below
commit 1055e4b99b3a293211fb1a8e264b26c3eb59d005
Author: JingsongLi <[email protected]>
AuthorDate: Fri Dec 19 11:48:48 2025 +0800
[core] Refactor scan and plan in DataEvolutionCompactCoordinator
---
.../DataEvolutionCompactCoordinator.java | 40 +++++++---------------
.../DataEvolutionCompactCoordinatorTest.java | 26 +++++++-------
2 files changed, 24 insertions(+), 42 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 892b88952a..e6a59da2b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -36,14 +36,15 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Compact coordinator to cmpact data evolution table. */
+/** Compact coordinator to compact data evolution table. */
public class DataEvolutionCompactCoordinator {
+ private static final int FILES_BATCH = 100_000;
+
private final CompactScanner scanner;
private final CompactPlanner planner;
@@ -55,19 +56,15 @@ public class DataEvolutionCompactCoordinator {
this.scanner = new CompactScanner(table.newSnapshotReader());
this.planner =
- new CompactPlanner(
- scanner::fetchResult,
- compactBlob,
- targetFileSize,
- openFileCost,
- compactMinFileNum);
+ new CompactPlanner(compactBlob, targetFileSize, openFileCost,
compactMinFileNum);
}
public List<DataEvolutionCompactTask> plan() {
// scan files in snapshot
- if (scanner.scan()) {
+ List<ManifestEntry> entries = scanner.scan();
+ if (!entries.isEmpty()) {
// do plan compact tasks
- return planner.compactPlan();
+ return planner.compactPlan(entries);
}
return Collections.emptyList();
@@ -79,8 +76,6 @@ public class DataEvolutionCompactCoordinator {
private final SnapshotReader snapshotReader;
private final Queue<List<ManifestFileMeta>> metas;
- private List<ManifestEntry> result;
-
private CompactScanner(SnapshotReader snapshotReader) {
this.snapshotReader = snapshotReader;
Snapshot snapshot =
snapshotReader.snapshotManager().latestSnapshot();
@@ -90,13 +85,11 @@ public class DataEvolutionCompactCoordinator {
RangeHelper<ManifestFileMeta> rangeHelper =
new RangeHelper<>(ManifestFileMeta::minRowId,
ManifestFileMeta::maxRowId);
this.metas = new
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
- this.result = new ArrayList<>();
}
- boolean scan() {
- boolean scanResult = false;
- while (metas.peek() != null && result.size() < 1000) {
- scanResult = true;
+ List<ManifestEntry> scan() {
+ List<ManifestEntry> result = new ArrayList<>();
+ while (metas.peek() != null && result.size() < FILES_BATCH) {
List<ManifestFileMeta> currentMetas = metas.poll();
List<ManifestEntry> targetEntries =
currentMetas.stream()
@@ -110,12 +103,6 @@ public class DataEvolutionCompactCoordinator {
result.addAll(targetEntries);
}
- return scanResult;
- }
-
- List<ManifestEntry> fetchResult() {
- List<ManifestEntry> result = new ArrayList<>(this.result);
- this.result = new ArrayList<>();
return result;
}
}
@@ -123,7 +110,6 @@ public class DataEvolutionCompactCoordinator {
/** Generate compaction tasks. */
static class CompactPlanner {
- private final Supplier<List<ManifestEntry>> supplier;
private final boolean compactBlob;
private final long targetFileSize;
private final long openFileCost;
@@ -138,20 +124,18 @@ public class DataEvolutionCompactCoordinator {
private List<DataFileMeta> blobFiles = new ArrayList<>();
CompactPlanner(
- Supplier<List<ManifestEntry>> supplier,
boolean compactBlob,
long targetFileSize,
long openFileCost,
long compactMinFileNum) {
- this.supplier = supplier;
this.compactBlob = compactBlob;
this.targetFileSize = targetFileSize;
this.openFileCost = openFileCost;
this.compactMinFileNum = compactMinFileNum;
}
- List<DataEvolutionCompactTask> compactPlan() {
- for (ManifestEntry entry : supplier.get()) {
+ List<DataEvolutionCompactTask> compactPlan(Iterable<ManifestEntry>
entries) {
+ for (ManifestEntry entry : entries) {
long rowId = entry.file().nonNullFirstRowId();
if (rowId < lastRowIdStart) {
throw new IllegalStateException(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index f7b3ab7839..1c66baa554 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -45,9 +45,9 @@ public class DataEvolutionCompactCoordinatorTest {
DataEvolutionCompactCoordinator.CompactPlanner planner =
new DataEvolutionCompactCoordinator.CompactPlanner(
- () -> entries, false, 128 * 1024 * 1024, 4 * 1024 *
1024, 2);
+ false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
- List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
assertThat(tasks).isEmpty();
}
@@ -62,17 +62,16 @@ public class DataEvolutionCompactCoordinatorTest {
// Use small target file size to trigger compaction
DataEvolutionCompactCoordinator.CompactPlanner planner =
- new DataEvolutionCompactCoordinator.CompactPlanner(() ->
entries, false, 199, 1, 2);
+ new DataEvolutionCompactCoordinator.CompactPlanner(false, 199,
1, 2);
- List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
assertThat(tasks).isNotEmpty();
assertThat(tasks.get(0).compactBefore())
.containsExactly(entries.get(0).file(), entries.get(1).file());
- planner =
- new DataEvolutionCompactCoordinator.CompactPlanner(() ->
entries, false, 200, 1, 2);
- tasks = planner.compactPlan();
+ planner = new DataEvolutionCompactCoordinator.CompactPlanner(false,
200, 1, 2);
+ tasks = planner.compactPlan(entries);
assertThat(tasks).isNotEmpty();
assertThat(tasks.get(0).compactBefore())
.containsExactly(
@@ -92,9 +91,9 @@ public class DataEvolutionCompactCoordinatorTest {
// Use large target file size so compaction is triggered by gap, not
size
DataEvolutionCompactCoordinator.CompactPlanner planner =
new DataEvolutionCompactCoordinator.CompactPlanner(
- () -> entries, false, 128 * 1024 * 1024, 4 * 1024 *
1024, 2);
+ false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
- List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
// Gap should trigger compaction of the first group (file1 + file2)
assertThat(tasks).hasSize(2);
@@ -122,9 +121,9 @@ public class DataEvolutionCompactCoordinatorTest {
DataEvolutionCompactCoordinator.CompactPlanner planner =
new DataEvolutionCompactCoordinator.CompactPlanner(
- () -> entries, false, 100 * 1024 * 1024, 4 * 1024 *
1024, 2);
+ false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2);
- List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
assertThat(tasks.size()).isEqualTo(4);
assertThat(tasks.get(0).compactBefore())
@@ -150,10 +149,9 @@ public class DataEvolutionCompactCoordinatorTest {
// Use small target to trigger compaction, with blob compaction enabled
DataEvolutionCompactCoordinator.CompactPlanner planner =
- new DataEvolutionCompactCoordinator.CompactPlanner(
- () -> entries, true, 1024, 1024, 2);
+ new DataEvolutionCompactCoordinator.CompactPlanner(true, 1024,
1024, 2);
- List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
// Should have compaction tasks for both data files and blob files
assertThat(tasks.size()).isEqualTo(2);