This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8296df0de3a3 fix(flink): enable integration test for Hudi Flink Source 
V2 (#18287)
8296df0de3a3 is described below

commit 8296df0de3a3eb5c9db04b78d222b1045f0ea4c8
Author: Peter Huang <[email protected]>
AuthorDate: Fri Mar 6 22:22:47 2026 -0800

    fix(flink): enable integration test for Hudi Flink Source V2 (#18287)
---
 .../source/enumerator/AbstractHoodieSplitEnumerator.java  |  3 ++-
 .../source/reader/function/HoodieSplitReaderFunction.java | 15 ++++++++++-----
 .../org/apache/hudi/table/ITTestHoodieDataSource.java     | 11 +++++++----
 3 files changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
index 9538ce60972f..c6ca3b0ea669 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
@@ -160,7 +160,8 @@ abstract class AbstractHoodieSplitEnumerator
         if (shouldWaitForMoreSplits()) {
           // for continuous split provider callback
           registerCallbackFromSplitProvider();
-          break;
+
+          // don't break here to make sure every reader get the chance of 
split assignment
         } else {
           // for static split provider
           log.info("No more splits available for subtask {}", awaitingSubtask);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 36d2f0a6f282..48b0cd839fc3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -56,12 +56,12 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   private final HoodieSchema requiredSchema;
   private final InternalSchemaManager internalSchemaManager;
   private final Configuration configuration;
-  private final org.apache.hadoop.conf.Configuration hadoopConf;
   private final HoodieWriteConfig writeConfig;
   private final String mergeType;
   private final boolean emitDelete;
   private final List<ExpressionPredicates.Predicate> predicates;
-  private HoodieFileGroupReader<RowData> fileGroupReader;
+  private transient HoodieFileGroupReader<RowData> fileGroupReader;
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
 
   public HoodieSplitReaderFunction(
       Configuration configuration,
@@ -79,18 +79,16 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
     this.requiredSchema = requiredSchema;
     this.internalSchemaManager = internalSchemaManager;
     this.configuration = configuration;
-    this.hadoopConf =  HadoopConfigurations.getHadoopConf(configuration);
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
     this.predicates = predicates;
     this.mergeType = mergeType;
     this.emitDelete = emitDelete;
-    this.fileGroupReader = null;
   }
 
   @Override
   public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
     final String splitId = split.splitId();
-    HoodieTableMetaClient metaClient = 
StreamerUtil.metaClientForReader(configuration, hadoopConf);
+    HoodieTableMetaClient metaClient = 
StreamerUtil.metaClientForReader(configuration, getHadoopConf());
 
     try {
       this.fileGroupReader = createFileGroupReader(split, metaClient);
@@ -141,4 +139,11 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
       split.getInstantRange()
     );
   }
+
+  private org.apache.hadoop.conf.Configuration getHadoopConf() {
+    if (hadoopConf == null) {
+      hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
+    }
+    return hadoopConf;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 17c0a2d069c2..fb096511f0a8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -142,8 +142,8 @@ public class ITTestHoodieDataSource {
   File tempFile;
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) 
throws Exception {
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType, 
boolean useSourceV2) throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
     streamTableEnv.executeSql(createSource);
@@ -167,6 +167,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.READ_START_COMMIT, firstCommit)
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
@@ -223,8 +224,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testStreamWriteAndRead(HoodieTableType tableType, boolean useSourceV2) 
throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
     streamTableEnv.executeSql(createSource);
@@ -236,7 +237,9 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
         .end();
+
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
     execInsertSql(streamTableEnv, insertInto);

Reply via email to