This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 365398affac8 refactor: Hudi Flink source v2 with better context 
management (#18269)
365398affac8 is described below

commit 365398affac8d8223eebe4ce411fc5e185b7b304
Author: Peter Huang <[email protected]>
AuthorDate: Tue Mar 3 19:13:29 2026 -0800

    refactor: Hudi Flink source v2 with better context management (#18269)
---
 .../java/org/apache/hudi/source/HoodieSource.java  |  2 +-
 .../reader/function/HoodieSplitReaderFunction.java | 19 +++--
 .../source/split/DefaultHoodieSplitDiscover.java   | 31 ++++++-
 .../org/apache/hudi/table/HoodieTableSource.java   |  3 +-
 .../org/apache/hudi/source/TestHoodieSource.java   | 10 ++-
 .../function/TestHoodieSplitReaderFunction.java    | 97 +++++++++++++++-------
 .../split/TestDefaultHoodieSplitDiscover.java      | 77 +++++++++++++++--
 7 files changed, 187 insertions(+), 52 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index fcfa4770efb5..d6bde3b74a0e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -153,7 +153,7 @@ public class HoodieSource<T> extends FileIndexReader 
implements Source<T, Hoodie
 
     if (scanContext.isStreaming()) {
       HoodieContinuousSplitDiscover discover = new DefaultHoodieSplitDiscover(
-          scanContext, metaClient);
+          scanContext);
 
       return new HoodieContinuousSplitEnumerator(
               tableName, enumContext, splitProvider, discover, scanContext,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 9178a8ecc30c..36d2f0a6f282 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.reader.BatchRecords;
@@ -40,6 +41,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -50,10 +52,11 @@ import java.util.stream.Collectors;
  * Default reader function implementation for both MOR and COW tables.
  */
 public class HoodieSplitReaderFunction implements SplitReaderFunction<RowData> 
{
-  private final HoodieTableMetaClient metaClient;
   private final HoodieSchema tableSchema;
   private final HoodieSchema requiredSchema;
+  private final InternalSchemaManager internalSchemaManager;
   private final Configuration configuration;
+  private final org.apache.hadoop.conf.Configuration hadoopConf;
   private final HoodieWriteConfig writeConfig;
   private final String mergeType;
   private final boolean emitDelete;
@@ -61,20 +64,22 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   private HoodieFileGroupReader<RowData> fileGroupReader;
 
   public HoodieSplitReaderFunction(
-      HoodieTableMetaClient metaClient,
       Configuration configuration,
       HoodieSchema tableSchema,
       HoodieSchema requiredSchema,
+      InternalSchemaManager internalSchemaManager,
       String mergeType,
       List<ExpressionPredicates.Predicate> predicates,
       boolean emitDelete) {
 
     ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be 
null");
     ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema 
can't be null");
-    this.metaClient = metaClient;
+    ValidationUtils.checkArgument(internalSchemaManager != null, 
"internalSchemaManager can't be null");
     this.tableSchema = tableSchema;
     this.requiredSchema = requiredSchema;
+    this.internalSchemaManager = internalSchemaManager;
     this.configuration = configuration;
+    this.hadoopConf =  HadoopConfigurations.getHadoopConf(configuration);
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
     this.predicates = predicates;
     this.mergeType = mergeType;
@@ -85,8 +90,10 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   @Override
   public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
     final String splitId = split.splitId();
+    HoodieTableMetaClient metaClient = 
StreamerUtil.metaClientForReader(configuration, hadoopConf);
+
     try {
-      this.fileGroupReader = createFileGroupReader(split);
+      this.fileGroupReader = createFileGroupReader(split, metaClient);
       final ClosableIterator<RowData> recordIterator = 
fileGroupReader.getClosableIterator();
       BatchRecords<RowData> records = BatchRecords.forRecords(splitId, 
recordIterator, split.getFileOffset(), split.getConsumed());
       records.seek(split.getConsumed());
@@ -109,7 +116,7 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
    * @param split The source split to read
    * @return A {@link HoodieFileGroupReader} instance
    */
-  private HoodieFileGroupReader<RowData> 
createFileGroupReader(HoodieSourceSplit split) {
+  private HoodieFileGroupReader<RowData> 
createFileGroupReader(HoodieSourceSplit split, HoodieTableMetaClient 
metaClient) {
     // Create FileSlice from split information
     FileSlice fileSlice = new FileSlice(
         new HoodieFileGroupId(split.getPartitionPath(), split.getFileId()),
@@ -123,7 +130,7 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
     return FormatUtils.createFileGroupReader(
       metaClient,
       writeConfig,
-      InternalSchemaManager.get(metaClient.getStorageConf(), metaClient),
+      internalSchemaManager,
       fileSlice,
       tableSchema,
       requiredSchema,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
index 5fb5245e9f0d..bee9a6fbd131 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -19,28 +19,34 @@
 package org.apache.hudi.source.split;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.HoodieScanContext;
 
+import org.apache.hudi.util.StreamerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * Default implementation of HoodieContinuousSplitDiscover.
  */
 public class DefaultHoodieSplitDiscover implements 
HoodieContinuousSplitDiscover {
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
 
-  private final HoodieTableMetaClient metaClient;
   private final HoodieScanContext scanContext;
   private final IncrementalInputSplits incrementalInputSplits;
+  private final Configuration hadoopConf;
+  private HoodieTableMetaClient metaClient;
 
   public DefaultHoodieSplitDiscover(
-      HoodieScanContext scanContext,
-      HoodieTableMetaClient metaClient) {
+      HoodieScanContext scanContext) {
     this.scanContext = scanContext;
-    this.metaClient = metaClient;
+    this.hadoopConf = 
HadoopConfigurations.getHadoopConf(scanContext.getConf());
+    this.metaClient = getOrCreateMetaClient();
     this.incrementalInputSplits = IncrementalInputSplits.builder()
         .conf(scanContext.getConf())
         .path(new Path(scanContext.getPath().toUri()))
@@ -54,6 +60,23 @@ public class DefaultHoodieSplitDiscover implements 
HoodieContinuousSplitDiscover
 
   @Override
   public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+    if (metaClient == null) {
+      return HoodieContinuousSplitBatch.EMPTY;
+    }
+
     return incrementalInputSplits.inputHoodieSourceSplits(metaClient, 
lastInstant, scanContext.isCdcEnabled());
   }
+
+  @Nullable
+  private HoodieTableMetaClient getOrCreateMetaClient() {
+    if (this.metaClient != null) {
+      return this.metaClient;
+    }
+    if (StreamerUtil.tableExists(this.scanContext.getPath().toString(), 
hadoopConf)) {
+      this.metaClient = 
StreamerUtil.createMetaClient(this.scanContext.getPath().toString(), 
hadoopConf);
+      return this.metaClient;
+    }
+    // fallback
+    return null;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index dda548682337..d346a12f8be3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -299,14 +299,13 @@ public class HoodieTableSource extends FileIndexReader 
implements
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
     HoodieScanContext context = createHoodieScanContext(rowType);
-
     final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
     boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
     HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
-        metaClient,
         conf,
         tableSchema,
         HoodieSchemaConverter.convertToSchema(requiredRowType),
+        internalSchemaManager,
         conf.get(FlinkOptions.MERGE_TYPE),
         predicates,
         emitDelete
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 2054a33052e8..2a8956e20f7e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.reader.HoodieRecordEmitter;
@@ -32,6 +33,8 @@ import 
org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 import org.apache.hudi.source.split.HoodieSourceSplitComparator;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -409,11 +412,12 @@ public class TestHoodieSource {
         .partitionPruner(partitionPruner)
         .build();
     HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
+    HadoopStorageConfiguration hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
     HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
-        metaClient,
         conf,
-            schema, // schema will be resolved from table
-            schema, // required schema
+        schema, // schema will be resolved from table
+        schema, // required schema
+        InternalSchemaManager.get(hadoopConf, this.metaClient),
         conf.get(FlinkOptions.MERGE_TYPE),
         Collections.emptyList(),
             false);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index e399236ba6b2..323b8b6f071f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.utils.TestConfigurations;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -53,11 +54,13 @@ public class TestHoodieSplitReaderFunction {
   private HoodieSchema tableSchema;
   private HoodieSchema requiredSchema;
   private HoodieTableMetaClient mockMetaClient;
+  private InternalSchemaManager mockInternalSchemaManager;
   private Configuration conf;
 
   @BeforeEach
   public void setUp() {
     mockMetaClient = mock(HoodieTableMetaClient.class);
+    mockInternalSchemaManager = mock(InternalSchemaManager.class);
     
when(mockMetaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
 
     // Create mock schemas
@@ -71,10 +74,10 @@ public class TestHoodieSplitReaderFunction {
     // Test that constructor requires non-null tableSchema
     assertThrows(IllegalArgumentException.class, () -> {
       new HoodieSplitReaderFunction(
-          mockMetaClient,
           conf,
           null,  // null tableSchema should throw
           requiredSchema,
+          mockInternalSchemaManager,
           "AVRO_PAYLOAD",
           Collections.emptyList(),
               false
@@ -87,11 +90,11 @@ public class TestHoodieSplitReaderFunction {
     // Test that constructor requires non-null requiredSchema
     assertThrows(IllegalArgumentException.class, () -> {
       new HoodieSplitReaderFunction(
-          mockMetaClient,
           conf,
           tableSchema,
           null,  // null requiredSchema should throw
-          "AVRO_PAYLOAD",
+          mockInternalSchemaManager,
+              "AVRO_PAYLOAD",
           Collections.emptyList(),
           false
       );
@@ -103,11 +106,11 @@ public class TestHoodieSplitReaderFunction {
     // Should not throw exception with valid parameters
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -121,11 +124,11 @@ public class TestHoodieSplitReaderFunction {
 
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -137,11 +140,11 @@ public class TestHoodieSplitReaderFunction {
   public void testClosedReaderIsNull() throws Exception {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
 
@@ -163,10 +166,11 @@ public class TestHoodieSplitReaderFunction {
     for (String mergeType : mergeTypes) {
       HoodieSplitReaderFunction function =
           new HoodieSplitReaderFunction(
-              mockMetaClient,
               conf,
               tableSchema,
               requiredSchema,
+              mockInternalSchemaManager,
+
               mergeType,
               Collections.emptyList(),
               false
@@ -180,10 +184,10 @@ public class TestHoodieSplitReaderFunction {
   public void testMultipleCloseCalls() throws Exception {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
+            mockInternalSchemaManager,
             "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
@@ -201,11 +205,11 @@ public class TestHoodieSplitReaderFunction {
     HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             customTableSchema,
             customRequiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -221,11 +225,11 @@ public class TestHoodieSplitReaderFunction {
     // Test with present internal schema
     HoodieSplitReaderFunction function1 =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -234,11 +238,11 @@ public class TestHoodieSplitReaderFunction {
     // Test with different internal schema
     HoodieSplitReaderFunction function2 =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -247,11 +251,12 @@ public class TestHoodieSplitReaderFunction {
     // Test with empty internal schema
     HoodieSplitReaderFunction function3 =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -264,11 +269,11 @@ public class TestHoodieSplitReaderFunction {
 
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
-                conf,
+            conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -281,11 +286,11 @@ public class TestHoodieSplitReaderFunction {
     // Verify that the read method returns CloseableIterator
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
@@ -297,11 +302,11 @@ public class TestHoodieSplitReaderFunction {
   public void testConstructorWithEmitDeleteTrue() {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             true
         );
@@ -319,10 +324,10 @@ public class TestHoodieSplitReaderFunction {
 
     HoodieSplitReaderFunction function =
             new HoodieSplitReaderFunction(
-                    mockMetaClient,
                     conf,
                     tableSchema,
                     requiredSchema,
+                    mockInternalSchemaManager,
                     "AVRO_PAYLOAD",
                     predicates,
                     true
@@ -335,15 +340,49 @@ public class TestHoodieSplitReaderFunction {
   public void testConstructorWithEmitDeleteFalse() {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
-            mockMetaClient,
             conf,
             tableSchema,
             requiredSchema,
-            "AVRO_PAYLOAD",
+            mockInternalSchemaManager,
+                "AVRO_PAYLOAD",
             Collections.emptyList(),
             false
         );
 
     assertNotNull(function);
   }
+
+  @Test
+  public void testConstructorValidatesInternalSchemaManager() {
+    // Test that constructor requires non-null InternalSchemaManager
+    assertThrows(IllegalArgumentException.class, () -> {
+      new HoodieSplitReaderFunction(
+          conf,
+          tableSchema,
+          requiredSchema,
+          null,  // null InternalSchemaManager should throw
+          "AVRO_PAYLOAD",
+          Collections.emptyList(),
+          false
+      );
+    });
+  }
+
+  @Test
+  public void testInternalSchemaManagerIsStored() {
+    InternalSchemaManager customManager = mock(InternalSchemaManager.class);
+
+    HoodieSplitReaderFunction function =
+        new HoodieSplitReaderFunction(
+            conf,
+            tableSchema,
+            requiredSchema,
+            customManager,
+            "AVRO_PAYLOAD",
+            Collections.emptyList(),
+            false
+        );
+
+    assertNotNull(function, "Function should be created with custom 
InternalSchemaManager");
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
index 38245d5e22e7..1304dad26b1c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -35,6 +35,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
+import java.io.File;
+
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /**
@@ -63,7 +65,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     // Query with the last instant - should return empty or minimal splits
     HoodieContinuousSplitBatch result = discover.discoverSplits(lastInstant);
@@ -93,7 +95,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     // Discover splits after the first instant
     HoodieContinuousSplitBatch result = 
discover.discoverSplits(firstInstant.getCompletionTime());
@@ -115,7 +117,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     // Discover splits from null (earliest)
     HoodieContinuousSplitBatch result = discover.discoverSplits(null);
@@ -137,7 +139,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     HoodieContinuousSplitBatch result = discover.discoverSplits(null);
 
@@ -164,7 +166,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     HoodieContinuousSplitBatch result = 
discover.discoverSplits(firstCompletionTime);
 
@@ -186,7 +188,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContextWithSkipOptions(conf, 
true, true, false);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     HoodieContinuousSplitBatch result = 
discover.discoverSplits(scanContext.getStartInstant());
 
@@ -201,11 +203,72 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
-        scanContext, metaClient);
+        scanContext);
 
     assertNotNull(discover, "Discover instance should not be null");
   }
 
+  @Test
+  void testDiscoverSplitsWithNonExistentTable() throws Exception {
+    // Use a path that doesn't have a Hudi table initialized
+    String nonExistentPath = basePath + "/non_existent_table";
+    Configuration conf = TestConfigurations.getDefaultConf(nonExistentPath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    HoodieScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext);
+
+    // Should return empty batch when table doesn't exist
+    HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+    // Empty batch should have no splits
+  }
+
+  @Test
+  void testLazyMetaClientInitialization() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    HoodieScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext);
+
+    // First call should initialize metaClient and discover splits
+    HoodieContinuousSplitBatch result1 = discover.discoverSplits(null);
+    assertNotNull(result1, "First result should not be null");
+
+    // Second call should reuse the same metaClient
+    HoodieContinuousSplitBatch result2 = discover.discoverSplits(null);
+    assertNotNull(result2, "Second result should not be null");
+  }
+
+  @Test
+  void testDiscoverSplitsHandlesNullMetaClientGracefully() throws Exception {
+    // Use a directory that exists but is not a Hudi table
+    String emptyPath = basePath + "/empty_dir";
+    new File(emptyPath).mkdirs();
+
+    Configuration conf = TestConfigurations.getDefaultConf(emptyPath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    HoodieScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext);
+
+    // Should handle null metaClient gracefully and return empty batch
+    HoodieContinuousSplitBatch result = 
discover.discoverSplits("20230101000000");
+
+    assertNotNull(result, "Result should not be null even with null 
metaClient");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
   // Helper methods
 
   private HoodieScanContext createScanContext(Configuration conf) throws 
Exception {

Reply via email to