This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d654f8710 [flink] Minor refactor for CompactorSourceBuilder
8d654f8710 is described below
commit 8d654f8710a077f2847dbc002932b3603e1ffa1b
Author: JingsongLi <[email protected]>
AuthorDate: Mon Feb 24 20:47:57 2025 +0800
[flink] Minor refactor for CompactorSourceBuilder
---
.../paimon/flink/source/CompactorSourceBuilder.java | 19 +++++++------------
1 file changed, 7 insertions(+), 12 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 2753ea7779..564f9d529a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -86,31 +86,26 @@ public class CompactorSourceBuilder {
}
private Source<RowData, ?, ?> buildSource(CompactBucketsTable
compactBucketsTable) {
+ ReadBuilder readBuilder =
+
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+ if (compactBucketsTable.coreOptions().manifestDeleteFileDropStats()) {
+ readBuilder = readBuilder.dropStats();
+ }
if (isContinuous) {
compactBucketsTable =
compactBucketsTable.copy(streamingCompactOptions());
- return new ContinuousFileStoreSource(
- getReadBuilder(compactBucketsTable),
compactBucketsTable.options(), null);
+ return new ContinuousFileStoreSource(readBuilder,
compactBucketsTable.options(), null);
} else {
compactBucketsTable =
compactBucketsTable.copy(batchCompactOptions());
Options options =
compactBucketsTable.coreOptions().toConfiguration();
return new StaticFileStoreSource(
- getReadBuilder(compactBucketsTable),
+ readBuilder,
null,
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE));
}
}
- private ReadBuilder getReadBuilder(CompactBucketsTable
compactBucketsTable) {
- ReadBuilder readBuilder =
-
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
- if
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
- readBuilder.dropStats();
- }
- return readBuilder;
- }
-
public DataStreamSource<RowData> build() {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment
should not be null.");