This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e72b19b622dd feat(flink): Support bucket pruning for source V2 (#18716)
e72b19b622dd is described below
commit e72b19b622dd66921c075cb937f39c6e0ad213a4
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 11 13:54:56 2026 +0800
feat(flink): Support bucket pruning for source V2 (#18716)
---
.../org/apache/hudi/source/HoodieScanContext.java | 3 ++
.../java/org/apache/hudi/source/HoodieSource.java | 1 +
.../org/apache/hudi/table/HoodieTableSource.java | 2 ++
.../org/apache/hudi/source/TestHoodieSource.java | 35 ++++++++++++++++++++++
4 files changed, 41 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index e75c9c8fddc7..f57038ba41c0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.storage.StoragePath;
import java.io.Serializable;
import java.time.Duration;
+import java.util.function.Function;
/**
* Hudi source scan context for finding completed commits for streaming and
incremental read.
@@ -63,6 +64,8 @@ public class HoodieScanContext implements Serializable {
private final PartitionPruners.PartitionPruner partitionPruner;
// Column stats probe
private final ColumnStatsProbe columnStatsProbe;
+ // Partition bucket id function for bucket pruning
+ private final Function<String, Integer> partitionBucketIdFunc;
private final long limit;
public Duration getScanInterval() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index eed5809ef374..6b9ef940c412 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -214,6 +214,7 @@ public class HoodieSource<T> extends FileIndexReader
implements Source<T, Hoodie
.metaClient(metaClient)
.columnStatsProbe(scanContext.getColumnStatsProbe())
.partitionPruner(scanContext.getPartitionPruner())
+ .partitionBucketIdFunc(scanContext.getPartitionBucketIdFunc())
.build();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6d4a2dd85c30..6d64d98ddeaa 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -354,6 +354,8 @@ public class HoodieTableSource extends FileIndexReader
implements
.maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
.partitionPruner(partitionPruner)
.columnStatsProbe(columnStatsProbe)
+
.partitionBucketIdFunc(PartitionBucketIdFunc.create(this.dataBucketFunc,
+ this.metaClient, conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
.isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
.limit(limit)
.build();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 66886bc74134..c9f1a66f6822 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.reader.HoodieRecordEmitter;
@@ -61,6 +63,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -264,6 +267,28 @@ public class TestHoodieSource {
assertTrue(splits3.size() < fullSplits.size());
}
+ @Test
+ public void testCreateBatchHoodieSplitsWithBucketPruner() throws Exception {
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
+ conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ metaClient = StreamerUtil.createMetaClient(conf);
+
+ HoodieSource<RowData> source1 = createHoodieSourceWithPruner(conf,
metaClient, null, null);
+ List<HoodieSourceSplit> fullSplits = source1.createBatchHoodieSplits();
+
+ int targetBucketId = 1;
+ String targetBucketIdStr = BucketIdentifier.bucketIdStr(targetBucketId);
+ HoodieSource<RowData> source = createHoodieSourceWithPruner(
+ conf, metaClient, null, null, partitionPath -> targetBucketId);
+ List<HoodieSourceSplit> prunedSplits = source.createBatchHoodieSplits();
+
+ assertTrue(prunedSplits.size() < fullSplits.size());
+ prunedSplits.forEach(split -> assertTrue(
+ split.getFileId().contains(targetBucketIdStr),
+ "Pruned split should belong to bucket " + targetBucketId));
+ }
+
@Test
public void testGetOrBuildFileIndexInternal() throws Exception {
metaClient = HoodieTestUtils.init(tempDir.getAbsolutePath(),
HoodieTableType.COPY_ON_WRITE);
@@ -412,6 +437,15 @@ public class TestHoodieSource {
HoodieTableMetaClient metaClient,
PartitionPruners.PartitionPruner partitionPruner,
ColumnStatsProbe columnStatsProbe) {
+ return createHoodieSourceWithPruner(conf, metaClient, partitionPruner,
columnStatsProbe, null);
+ }
+
+ private HoodieSource<RowData> createHoodieSourceWithPruner(
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ PartitionPruners.PartitionPruner partitionPruner,
+ ColumnStatsProbe columnStatsProbe,
+ Function<String, Integer> partitionBucketIdFunc) {
RowType rowType = TestConfigurations.ROW_TYPE;
HoodieScanContext scanContext = HoodieScanContext.builder()
.conf(conf)
@@ -428,6 +462,7 @@ public class TestHoodieSource {
.isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
.partitionPruner(partitionPruner)
.columnStatsProbe(columnStatsProbe)
+ .partitionBucketIdFunc(partitionBucketIdFunc)
.build();
HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
HadoopStorageConfiguration hadoopConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));