This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8296df0de3a3 fix(flink): enable integration test for Hudi Flink Source
V2 (#18287)
8296df0de3a3 is described below
commit 8296df0de3a3eb5c9db04b78d222b1045f0ea4c8
Author: Peter Huang <[email protected]>
AuthorDate: Fri Mar 6 22:22:47 2026 -0800
fix(flink): enable integration test for Hudi Flink Source V2 (#18287)
---
.../source/enumerator/AbstractHoodieSplitEnumerator.java | 3 ++-
.../source/reader/function/HoodieSplitReaderFunction.java | 15 ++++++++++-----
.../org/apache/hudi/table/ITTestHoodieDataSource.java | 11 +++++++----
3 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
index 9538ce60972f..c6ca3b0ea669 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
@@ -160,7 +160,8 @@ abstract class AbstractHoodieSplitEnumerator
if (shouldWaitForMoreSplits()) {
// for continuous split provider callback
registerCallbackFromSplitProvider();
- break;
+
+ // don't break here to make sure every reader get the chance of
split assignment
} else {
// for static split provider
log.info("No more splits available for subtask {}", awaitingSubtask);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 36d2f0a6f282..48b0cd839fc3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -56,12 +56,12 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
private final HoodieSchema requiredSchema;
private final InternalSchemaManager internalSchemaManager;
private final Configuration configuration;
- private final org.apache.hadoop.conf.Configuration hadoopConf;
private final HoodieWriteConfig writeConfig;
private final String mergeType;
private final boolean emitDelete;
private final List<ExpressionPredicates.Predicate> predicates;
- private HoodieFileGroupReader<RowData> fileGroupReader;
+ private transient HoodieFileGroupReader<RowData> fileGroupReader;
+ private transient org.apache.hadoop.conf.Configuration hadoopConf;
public HoodieSplitReaderFunction(
Configuration configuration,
@@ -79,18 +79,16 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
this.requiredSchema = requiredSchema;
this.internalSchemaManager = internalSchemaManager;
this.configuration = configuration;
- this.hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
this.predicates = predicates;
this.mergeType = mergeType;
this.emitDelete = emitDelete;
- this.fileGroupReader = null;
}
@Override
public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>
read(HoodieSourceSplit split) {
final String splitId = split.splitId();
- HoodieTableMetaClient metaClient =
StreamerUtil.metaClientForReader(configuration, hadoopConf);
+ HoodieTableMetaClient metaClient =
StreamerUtil.metaClientForReader(configuration, getHadoopConf());
try {
this.fileGroupReader = createFileGroupReader(split, metaClient);
@@ -141,4 +139,11 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
split.getInstantRange()
);
}
+
+ private org.apache.hadoop.conf.Configuration getHadoopConf() {
+ if (hadoopConf == null) {
+ hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
+ }
+ return hadoopConf;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 17c0a2d069c2..fb096511f0a8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -142,8 +142,8 @@ public class ITTestHoodieDataSource {
File tempFile;
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType)
throws Exception {
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType,
boolean useSourceV2) throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
@@ -167,6 +167,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, firstCommit)
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
@@ -223,8 +224,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testStreamWriteAndRead(HoodieTableType tableType, boolean useSourceV2)
throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
@@ -236,7 +237,9 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
.end();
+
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);