This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e72b19b622dd feat(flink): Support bucket pruning for source V2 (#18716)
e72b19b622dd is described below

commit e72b19b622dd66921c075cb937f39c6e0ad213a4
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 11 13:54:56 2026 +0800

    feat(flink): Support bucket pruning for source V2 (#18716)
---
 .../org/apache/hudi/source/HoodieScanContext.java  |  3 ++
 .../java/org/apache/hudi/source/HoodieSource.java  |  1 +
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 ++
 .../org/apache/hudi/source/TestHoodieSource.java   | 35 ++++++++++++++++++++++
 4 files changed, 41 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index e75c9c8fddc7..f57038ba41c0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.storage.StoragePath;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.function.Function;
 
 /**
  * Hudi source scan context for finding completed commits for streaming and 
incremental read.
@@ -63,6 +64,8 @@ public class HoodieScanContext implements Serializable {
   private final PartitionPruners.PartitionPruner partitionPruner;
   // Column stats probe
   private final ColumnStatsProbe columnStatsProbe;
+  // Partition bucket id function for bucket pruning
+  private final Function<String, Integer> partitionBucketIdFunc;
   private final long limit;
 
   public Duration getScanInterval() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index eed5809ef374..6b9ef940c412 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -214,6 +214,7 @@ public class HoodieSource<T> extends FileIndexReader 
implements Source<T, Hoodie
         .metaClient(metaClient)
         .columnStatsProbe(scanContext.getColumnStatsProbe())
         .partitionPruner(scanContext.getPartitionPruner())
+        .partitionBucketIdFunc(scanContext.getPartitionBucketIdFunc())
         .build();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6d4a2dd85c30..6d64d98ddeaa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -354,6 +354,8 @@ public class HoodieTableSource extends FileIndexReader 
implements
         .maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
         .partitionPruner(partitionPruner)
         .columnStatsProbe(columnStatsProbe)
+        
.partitionBucketIdFunc(PartitionBucketIdFunc.create(this.dataBucketFunc,
+            this.metaClient, conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
         .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
         .limit(limit)
         .build();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 66886bc74134..c9f1a66f6822 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.reader.HoodieRecordEmitter;
@@ -61,6 +63,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -264,6 +267,28 @@ public class TestHoodieSource {
     assertTrue(splits3.size() < fullSplits.size());
   }
 
+  @Test
+  public void testCreateBatchHoodieSplitsWithBucketPruner() throws Exception {
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    metaClient = StreamerUtil.createMetaClient(conf);
+
+    HoodieSource<RowData> source1 = createHoodieSourceWithPruner(conf, 
metaClient, null, null);
+    List<HoodieSourceSplit> fullSplits = source1.createBatchHoodieSplits();
+
+    int targetBucketId = 1;
+    String targetBucketIdStr = BucketIdentifier.bucketIdStr(targetBucketId);
+    HoodieSource<RowData> source = createHoodieSourceWithPruner(
+        conf, metaClient, null, null, partitionPath -> targetBucketId);
+    List<HoodieSourceSplit> prunedSplits = source.createBatchHoodieSplits();
+
+    assertTrue(prunedSplits.size() < fullSplits.size());
+    prunedSplits.forEach(split -> assertTrue(
+        split.getFileId().contains(targetBucketIdStr),
+        "Pruned split should belong to bucket " + targetBucketId));
+  }
+
   @Test
   public void testGetOrBuildFileIndexInternal() throws Exception {
     metaClient = HoodieTestUtils.init(tempDir.getAbsolutePath(), 
HoodieTableType.COPY_ON_WRITE);
@@ -412,6 +437,15 @@ public class TestHoodieSource {
       HoodieTableMetaClient metaClient,
       PartitionPruners.PartitionPruner partitionPruner,
       ColumnStatsProbe columnStatsProbe) {
+    return createHoodieSourceWithPruner(conf, metaClient, partitionPruner, 
columnStatsProbe, null);
+  }
+
+  private HoodieSource<RowData> createHoodieSourceWithPruner(
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      PartitionPruners.PartitionPruner partitionPruner,
+      ColumnStatsProbe columnStatsProbe,
+      Function<String, Integer> partitionBucketIdFunc) {
     RowType rowType = TestConfigurations.ROW_TYPE;
     HoodieScanContext scanContext = HoodieScanContext.builder()
         .conf(conf)
@@ -428,6 +462,7 @@ public class TestHoodieSource {
         .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
         .partitionPruner(partitionPruner)
         .columnStatsProbe(columnStatsProbe)
+        .partitionBucketIdFunc(partitionBucketIdFunc)
         .build();
     HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
     HadoopStorageConfiguration hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));

Reply via email to