This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d654f8710 [flink] Minor refactor for CompactorSourceBuilder
8d654f8710 is described below

commit 8d654f8710a077f2847dbc002932b3603e1ffa1b
Author: JingsongLi <[email protected]>
AuthorDate: Mon Feb 24 20:47:57 2025 +0800

    [flink] Minor refactor for CompactorSourceBuilder
---
 .../paimon/flink/source/CompactorSourceBuilder.java   | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 2753ea7779..564f9d529a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -86,31 +86,26 @@ public class CompactorSourceBuilder {
     }
 
     private Source<RowData, ?, ?> buildSource(CompactBucketsTable 
compactBucketsTable) {
+        ReadBuilder readBuilder =
+                
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+        if (compactBucketsTable.coreOptions().manifestDeleteFileDropStats()) {
+            readBuilder = readBuilder.dropStats();
+        }
         if (isContinuous) {
             compactBucketsTable = 
compactBucketsTable.copy(streamingCompactOptions());
-            return new ContinuousFileStoreSource(
-                    getReadBuilder(compactBucketsTable), 
compactBucketsTable.options(), null);
+            return new ContinuousFileStoreSource(readBuilder, 
compactBucketsTable.options(), null);
         } else {
             compactBucketsTable = 
compactBucketsTable.copy(batchCompactOptions());
             Options options = 
compactBucketsTable.coreOptions().toConfiguration();
 
             return new StaticFileStoreSource(
-                    getReadBuilder(compactBucketsTable),
+                    readBuilder,
                     null,
                     
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
                     
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE));
         }
     }
 
-    private ReadBuilder getReadBuilder(CompactBucketsTable 
compactBucketsTable) {
-        ReadBuilder readBuilder =
-                
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
-        if 
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
-            readBuilder.dropStats();
-        }
-        return readBuilder;
-    }
-
     public DataStreamSource<RowData> build() {
         if (env == null) {
             throw new IllegalArgumentException("StreamExecutionEnvironment 
should not be null.");

Reply via email to