This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7582dacd6f [core] fix that batch CompactAction makes no sense with all 
level-0 files (#5790)
7582dacd6f is described below

commit 7582dacd6f666d33ac7bf7d0fe5651d17ee3bd64
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Jul 7 13:10:45 2025 +0800

    [core] fix that batch CompactAction makes no sense with all level-0 files 
(#5790)
---
 .../main/java/org/apache/paimon/CoreOptions.java   | 37 ++++++++++++++++
 .../paimon/table/source/DataTableBatchScan.java    |  6 ++-
 .../flink/source/CompactorSourceBuilder.java       |  3 ++
 .../paimon/flink/action/CompactActionITCase.java   | 50 ++++++++++++++++++++++
 4 files changed, 95 insertions(+), 1 deletion(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5a8ffb3082..a6e7c81909 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1105,6 +1105,15 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "The mode of streaming read that specifies to read 
the data of table file or log.");
 
+    @ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<BatchScanMode> BATCH_SCAN_MODE =
+            key("batch-scan-mode")
+                    .enumType(BatchScanMode.class)
+                    .defaultValue(BatchScanMode.NONE)
+                    .withDescription(
+                            "Only used to force TableScan to construct 
suitable 'StartingUpScanner' and 'FollowUpScanner' "
+                                    + "dedicated internal streaming scan.");
+
     public static final ConfigOption<Duration> CONSUMER_EXPIRATION_TIME =
             key("consumer.expiration-time")
                     .durationType()
@@ -3058,6 +3067,34 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    /** Inner batch scan mode for some internal requirements. */
+    public enum BatchScanMode implements DescribedEnum {
+        NONE("none", "No requirement."),
+        COMPACT("compact", "Compaction for batch mode.");
+
+        private final String value;
+        private final String description;
+
+        BatchScanMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+
     /** Specifies this scan type for incremental scan . */
     public enum IncrementalBetweenScanMode implements DescribedEnum {
         AUTO(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 0bf6927e4e..32ac9f5d1f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -48,7 +48,11 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         this.hasNext = true;
 
         if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
-            snapshotReader.withLevelFilter(level -> level > 
0).enableValueFilter();
+            if (options.toConfiguration()
+                    .get(CoreOptions.BATCH_SCAN_MODE)
+                    .equals(CoreOptions.BatchScanMode.NONE)) {
+                snapshotReader.withLevelFilter(level -> level > 
0).enableValueFilter();
+            }
         }
         if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
             snapshotReader.onlyReadRealBuckets();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 1f6b97edd2..47e1997fb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -167,6 +167,9 @@ public class CompactorSourceBuilder {
                 put(CoreOptions.SCAN_TIMESTAMP.key(), null);
                 put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
                 put(CoreOptions.SCAN_MODE.key(), 
CoreOptions.StartupMode.LATEST_FULL.toString());
+                put(
+                        CoreOptions.BATCH_SCAN_MODE.key(),
+                        CoreOptions.BatchScanMode.COMPACT.getValue());
             }
         };
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 18beabcb40..419eb6a521 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -91,6 +91,56 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
         }
     }
 
+    @Test
+    @Timeout(60)
+    public void testCompactWhenSkipLevel0() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+        // in dv mode or merge-engine = first-row, batch read will skip level-0
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            tableOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), 
"true");
+        } else {
+            tableOptions.put(CoreOptions.MERGE_ENGINE.key(), "first-row");
+        }
+        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+
+        FileStoreTable table =
+                prepareTable(
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        Collections.emptyList(),
+                        tableOptions);
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(2, 100, 15, BinaryString.fromString("20221208")),
+                rowData(2, 100, 16, BinaryString.fromString("20221208")),
+                rowData(2, 100, 15, BinaryString.fromString("20221209")));
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+        assertThat(table.newScan().plan().splits().size()).isEqualTo(0);
+
+        runAction(false);
+
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(3);
+        for (DataSplit split : splits) {
+            if (split.partition().getInt(1) == 15) {
+                // compacted
+                assertThat(split.dataFiles().size()).isEqualTo(1);
+            } else {
+                // not compacted
+                assertThat(split.dataFiles().size()).isEqualTo(2);
+            }
+        }
+    }
+
     @Test
     public void testStreamingCompact() throws Exception {
         Map<String, String> tableOptions = new HashMap<>();

Reply via email to