This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7582dacd6f [core] fix that batch CompactAction makes no sense with all
level-0 files (#5790)
7582dacd6f is described below
commit 7582dacd6f666d33ac7bf7d0fe5651d17ee3bd64
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Jul 7 13:10:45 2025 +0800
[core] fix that batch CompactAction makes no sense with all level-0 files
(#5790)
---
.../main/java/org/apache/paimon/CoreOptions.java | 37 ++++++++++++++++
.../paimon/table/source/DataTableBatchScan.java | 6 ++-
.../flink/source/CompactorSourceBuilder.java | 3 ++
.../paimon/flink/action/CompactActionITCase.java | 50 ++++++++++++++++++++++
4 files changed, 95 insertions(+), 1 deletion(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5a8ffb3082..a6e7c81909 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1105,6 +1105,15 @@ public class CoreOptions implements Serializable {
.withDescription(
"The mode of streaming read that specifies to read
the data of table file or log.");
+ @ExcludeFromDocumentation("Internal use only")
+ public static final ConfigOption<BatchScanMode> BATCH_SCAN_MODE =
+ key("batch-scan-mode")
+ .enumType(BatchScanMode.class)
+ .defaultValue(BatchScanMode.NONE)
+ .withDescription(
+ "Only used to force TableScan to construct
suitable 'StartingUpScanner' and 'FollowUpScanner' "
+ + "dedicated internal streaming scan.");
+
public static final ConfigOption<Duration> CONSUMER_EXPIRATION_TIME =
key("consumer.expiration-time")
.durationType()
@@ -3058,6 +3067,34 @@ public class CoreOptions implements Serializable {
}
}
+ /** Inner batch scan mode for some internal requirements. */
+ public enum BatchScanMode implements DescribedEnum {
+ NONE("none", "No requirement."),
+ COMPACT("compact", "Compaction for batch mode.");
+
+ private final String value;
+ private final String description;
+
+ BatchScanMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
/** Specifies this scan type for incremental scan . */
public enum IncrementalBetweenScanMode implements DescribedEnum {
AUTO(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 0bf6927e4e..32ac9f5d1f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -48,7 +48,11 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
this.hasNext = true;
if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
- snapshotReader.withLevelFilter(level -> level >
0).enableValueFilter();
+ if (options.toConfiguration()
+ .get(CoreOptions.BATCH_SCAN_MODE)
+ .equals(CoreOptions.BatchScanMode.NONE)) {
+ snapshotReader.withLevelFilter(level -> level >
0).enableValueFilter();
+ }
}
if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
snapshotReader.onlyReadRealBuckets();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 1f6b97edd2..47e1997fb6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -167,6 +167,9 @@ public class CompactorSourceBuilder {
put(CoreOptions.SCAN_TIMESTAMP.key(), null);
put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
put(CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.LATEST_FULL.toString());
+ put(
+ CoreOptions.BATCH_SCAN_MODE.key(),
+ CoreOptions.BatchScanMode.COMPACT.getValue());
}
};
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 18beabcb40..419eb6a521 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -91,6 +91,56 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
}
}
+ @Test
+ @Timeout(60)
+ public void testCompactWhenSkipLevel0() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ // in dv mode or merge-engine = first-row, batch read will skip level-0
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ tableOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(),
"true");
+ } else {
+ tableOptions.put(CoreOptions.MERGE_ENGINE.key(), "first-row");
+ }
+ tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+
+ FileStoreTable table =
+ prepareTable(
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ Collections.emptyList(),
+ tableOptions);
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, 100, 15, BinaryString.fromString("20221208")),
+ rowData(2, 100, 16, BinaryString.fromString("20221208")),
+ rowData(2, 100, 15, BinaryString.fromString("20221209")));
+
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+ assertThat(table.newScan().plan().splits().size()).isEqualTo(0);
+
+ runAction(false);
+
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ if (split.partition().getInt(1) == 15) {
+ // compacted
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ } else {
+ // not compacted
+ assertThat(split.dataFiles().size()).isEqualTo(2);
+ }
+ }
+ }
+
@Test
public void testStreamingCompact() throws Exception {
Map<String, String> tableOptions = new HashMap<>();