This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 365398affac8 refactor: Hudi Flink source v2 with better context
management (#18269)
365398affac8 is described below
commit 365398affac8d8223eebe4ce411fc5e185b7b304
Author: Peter Huang <[email protected]>
AuthorDate: Tue Mar 3 19:13:29 2026 -0800
refactor: Hudi Flink source v2 with better context management (#18269)
---
.../java/org/apache/hudi/source/HoodieSource.java | 2 +-
.../reader/function/HoodieSplitReaderFunction.java | 19 +++--
.../source/split/DefaultHoodieSplitDiscover.java | 31 ++++++-
.../org/apache/hudi/table/HoodieTableSource.java | 3 +-
.../org/apache/hudi/source/TestHoodieSource.java | 10 ++-
.../function/TestHoodieSplitReaderFunction.java | 97 +++++++++++++++-------
.../split/TestDefaultHoodieSplitDiscover.java | 77 +++++++++++++++--
7 files changed, 187 insertions(+), 52 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index fcfa4770efb5..d6bde3b74a0e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -153,7 +153,7 @@ public class HoodieSource<T> extends FileIndexReader
implements Source<T, Hoodie
if (scanContext.isStreaming()) {
HoodieContinuousSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
return new HoodieContinuousSplitEnumerator(
tableName, enumContext, splitProvider, discover, scanContext,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 9178a8ecc30c..36d2f0a6f282 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -29,6 +29,7 @@ import
org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.reader.BatchRecords;
@@ -40,6 +41,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
import java.io.IOException;
import java.util.Collections;
@@ -50,10 +52,11 @@ import java.util.stream.Collectors;
* Default reader function implementation for both MOR and COW tables.
*/
public class HoodieSplitReaderFunction implements SplitReaderFunction<RowData>
{
- private final HoodieTableMetaClient metaClient;
private final HoodieSchema tableSchema;
private final HoodieSchema requiredSchema;
+ private final InternalSchemaManager internalSchemaManager;
private final Configuration configuration;
+ private final org.apache.hadoop.conf.Configuration hadoopConf;
private final HoodieWriteConfig writeConfig;
private final String mergeType;
private final boolean emitDelete;
@@ -61,20 +64,22 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
private HoodieFileGroupReader<RowData> fileGroupReader;
public HoodieSplitReaderFunction(
- HoodieTableMetaClient metaClient,
Configuration configuration,
HoodieSchema tableSchema,
HoodieSchema requiredSchema,
+ InternalSchemaManager internalSchemaManager,
String mergeType,
List<ExpressionPredicates.Predicate> predicates,
boolean emitDelete) {
ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be
null");
ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema
can't be null");
- this.metaClient = metaClient;
+ ValidationUtils.checkArgument(internalSchemaManager != null,
"internalSchemaManager can't be null");
this.tableSchema = tableSchema;
this.requiredSchema = requiredSchema;
+ this.internalSchemaManager = internalSchemaManager;
this.configuration = configuration;
+ this.hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
this.predicates = predicates;
this.mergeType = mergeType;
@@ -85,8 +90,10 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
@Override
public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>
read(HoodieSourceSplit split) {
final String splitId = split.splitId();
+ HoodieTableMetaClient metaClient =
StreamerUtil.metaClientForReader(configuration, hadoopConf);
+
try {
- this.fileGroupReader = createFileGroupReader(split);
+ this.fileGroupReader = createFileGroupReader(split, metaClient);
final ClosableIterator<RowData> recordIterator =
fileGroupReader.getClosableIterator();
BatchRecords<RowData> records = BatchRecords.forRecords(splitId,
recordIterator, split.getFileOffset(), split.getConsumed());
records.seek(split.getConsumed());
@@ -109,7 +116,7 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
* @param split The source split to read
* @return A {@link HoodieFileGroupReader} instance
*/
- private HoodieFileGroupReader<RowData>
createFileGroupReader(HoodieSourceSplit split) {
+ private HoodieFileGroupReader<RowData>
createFileGroupReader(HoodieSourceSplit split, HoodieTableMetaClient
metaClient) {
// Create FileSlice from split information
FileSlice fileSlice = new FileSlice(
new HoodieFileGroupId(split.getPartitionPath(), split.getFileId()),
@@ -123,7 +130,7 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
return FormatUtils.createFileGroupReader(
metaClient,
writeConfig,
- InternalSchemaManager.get(metaClient.getStorageConf(), metaClient),
+ internalSchemaManager,
fileSlice,
tableSchema,
requiredSchema,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
index 5fb5245e9f0d..bee9a6fbd131 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -19,28 +19,34 @@
package org.apache.hudi.source.split;
import org.apache.flink.core.fs.Path;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.HoodieScanContext;
+import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/**
* Default implementation of HoodieContinuousSplitDiscover.
*/
public class DefaultHoodieSplitDiscover implements
HoodieContinuousSplitDiscover {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
- private final HoodieTableMetaClient metaClient;
private final HoodieScanContext scanContext;
private final IncrementalInputSplits incrementalInputSplits;
+ private final Configuration hadoopConf;
+ private HoodieTableMetaClient metaClient;
public DefaultHoodieSplitDiscover(
- HoodieScanContext scanContext,
- HoodieTableMetaClient metaClient) {
+ HoodieScanContext scanContext) {
this.scanContext = scanContext;
- this.metaClient = metaClient;
+ this.hadoopConf =
HadoopConfigurations.getHadoopConf(scanContext.getConf());
+ this.metaClient = getOrCreateMetaClient();
this.incrementalInputSplits = IncrementalInputSplits.builder()
.conf(scanContext.getConf())
.path(new Path(scanContext.getPath().toUri()))
@@ -54,6 +60,23 @@ public class DefaultHoodieSplitDiscover implements
HoodieContinuousSplitDiscover
@Override
public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+ if (metaClient == null) {
+ return HoodieContinuousSplitBatch.EMPTY;
+ }
+
return incrementalInputSplits.inputHoodieSourceSplits(metaClient,
lastInstant, scanContext.isCdcEnabled());
}
+
+ @Nullable
+ private HoodieTableMetaClient getOrCreateMetaClient() {
+ if (this.metaClient != null) {
+ return this.metaClient;
+ }
+ if (StreamerUtil.tableExists(this.scanContext.getPath().toString(),
hadoopConf)) {
+ this.metaClient =
StreamerUtil.createMetaClient(this.scanContext.getPath().toString(),
hadoopConf);
+ return this.metaClient;
+ }
+ // fallback
+ return null;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index dda548682337..d346a12f8be3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -299,14 +299,13 @@ public class HoodieTableSource extends FileIndexReader
implements
final RowType requiredRowType = (RowType)
getProducedDataType().notNull().getLogicalType();
HoodieScanContext context = createHoodieScanContext(rowType);
-
final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
HoodieSplitReaderFunction splitReaderFunction = new
HoodieSplitReaderFunction(
- metaClient,
conf,
tableSchema,
HoodieSchemaConverter.convertToSchema(requiredRowType),
+ internalSchemaManager,
conf.get(FlinkOptions.MERGE_TYPE),
predicates,
emitDelete
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 2054a33052e8..2a8956e20f7e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.reader.HoodieRecordEmitter;
@@ -32,6 +33,8 @@ import
org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.hudi.source.split.HoodieSourceSplitComparator;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -409,11 +412,12 @@ public class TestHoodieSource {
.partitionPruner(partitionPruner)
.build();
HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
+ HadoopStorageConfiguration hadoopConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
HoodieSplitReaderFunction splitReaderFunction = new
HoodieSplitReaderFunction(
- metaClient,
conf,
- schema, // schema will be resolved from table
- schema, // required schema
+ schema, // schema will be resolved from table
+ schema, // required schema
+ InternalSchemaManager.get(hadoopConf, this.metaClient),
conf.get(FlinkOptions.MERGE_TYPE),
Collections.emptyList(),
false);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index e399236ba6b2..323b8b6f071f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.utils.TestConfigurations;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -53,11 +54,13 @@ public class TestHoodieSplitReaderFunction {
private HoodieSchema tableSchema;
private HoodieSchema requiredSchema;
private HoodieTableMetaClient mockMetaClient;
+ private InternalSchemaManager mockInternalSchemaManager;
private Configuration conf;
@BeforeEach
public void setUp() {
mockMetaClient = mock(HoodieTableMetaClient.class);
+ mockInternalSchemaManager = mock(InternalSchemaManager.class);
when(mockMetaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
// Create mock schemas
@@ -71,10 +74,10 @@ public class TestHoodieSplitReaderFunction {
// Test that constructor requires non-null tableSchema
assertThrows(IllegalArgumentException.class, () -> {
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
null, // null tableSchema should throw
requiredSchema,
+ mockInternalSchemaManager,
"AVRO_PAYLOAD",
Collections.emptyList(),
false
@@ -87,11 +90,11 @@ public class TestHoodieSplitReaderFunction {
// Test that constructor requires non-null requiredSchema
assertThrows(IllegalArgumentException.class, () -> {
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
null, // null requiredSchema should throw
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -103,11 +106,11 @@ public class TestHoodieSplitReaderFunction {
// Should not throw exception with valid parameters
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -121,11 +124,11 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -137,11 +140,11 @@ public class TestHoodieSplitReaderFunction {
public void testClosedReaderIsNull() throws Exception {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
@@ -163,10 +166,11 @@ public class TestHoodieSplitReaderFunction {
for (String mergeType : mergeTypes) {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
+ mockInternalSchemaManager,
+
mergeType,
Collections.emptyList(),
false
@@ -180,10 +184,10 @@ public class TestHoodieSplitReaderFunction {
public void testMultipleCloseCalls() throws Exception {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
+ mockInternalSchemaManager,
"AVRO_PAYLOAD",
Collections.emptyList(),
false
@@ -201,11 +205,11 @@ public class TestHoodieSplitReaderFunction {
HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
customTableSchema,
customRequiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -221,11 +225,11 @@ public class TestHoodieSplitReaderFunction {
// Test with present internal schema
HoodieSplitReaderFunction function1 =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -234,11 +238,11 @@ public class TestHoodieSplitReaderFunction {
// Test with different internal schema
HoodieSplitReaderFunction function2 =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -247,11 +251,12 @@ public class TestHoodieSplitReaderFunction {
// Test with empty internal schema
HoodieSplitReaderFunction function3 =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -264,11 +269,11 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
- conf,
+ conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -281,11 +286,11 @@ public class TestHoodieSplitReaderFunction {
// Verify that the read method returns CloseableIterator
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
@@ -297,11 +302,11 @@ public class TestHoodieSplitReaderFunction {
public void testConstructorWithEmitDeleteTrue() {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
true
);
@@ -319,10 +324,10 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
+ mockInternalSchemaManager,
"AVRO_PAYLOAD",
predicates,
true
@@ -335,15 +340,49 @@ public class TestHoodieSplitReaderFunction {
public void testConstructorWithEmitDeleteFalse() {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
- mockMetaClient,
conf,
tableSchema,
requiredSchema,
- "AVRO_PAYLOAD",
+ mockInternalSchemaManager,
+ "AVRO_PAYLOAD",
Collections.emptyList(),
false
);
assertNotNull(function);
}
+
+ @Test
+ public void testConstructorValidatesInternalSchemaManager() {
+ // Test that constructor requires non-null InternalSchemaManager
+ assertThrows(IllegalArgumentException.class, () -> {
+ new HoodieSplitReaderFunction(
+ conf,
+ tableSchema,
+ requiredSchema,
+ null, // null InternalSchemaManager should throw
+ "AVRO_PAYLOAD",
+ Collections.emptyList(),
+ false
+ );
+ });
+ }
+
+ @Test
+ public void testInternalSchemaManagerIsStored() {
+ InternalSchemaManager customManager = mock(InternalSchemaManager.class);
+
+ HoodieSplitReaderFunction function =
+ new HoodieSplitReaderFunction(
+ conf,
+ tableSchema,
+ requiredSchema,
+ customManager,
+ "AVRO_PAYLOAD",
+ Collections.emptyList(),
+ false
+ );
+
+ assertNotNull(function, "Function should be created with custom
InternalSchemaManager");
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
index 38245d5e22e7..1304dad26b1c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -35,6 +35,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import java.io.File;
+
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
@@ -63,7 +65,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
// Query with the last instant - should return empty or minimal splits
HoodieContinuousSplitBatch result = discover.discoverSplits(lastInstant);
@@ -93,7 +95,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
// Discover splits after the first instant
HoodieContinuousSplitBatch result =
discover.discoverSplits(firstInstant.getCompletionTime());
@@ -115,7 +117,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
// Discover splits from null (earliest)
HoodieContinuousSplitBatch result = discover.discoverSplits(null);
@@ -137,7 +139,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
HoodieContinuousSplitBatch result = discover.discoverSplits(null);
@@ -164,7 +166,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
HoodieContinuousSplitBatch result =
discover.discoverSplits(firstCompletionTime);
@@ -186,7 +188,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContextWithSkipOptions(conf,
true, true, false);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
HoodieContinuousSplitBatch result =
discover.discoverSplits(scanContext.getStartInstant());
@@ -201,11 +203,72 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
- scanContext, metaClient);
+ scanContext);
assertNotNull(discover, "Discover instance should not be null");
}
+ @Test
+ void testDiscoverSplitsWithNonExistentTable() throws Exception {
+ // Use a path that doesn't have a Hudi table initialized
+ String nonExistentPath = basePath + "/non_existent_table";
+ Configuration conf = TestConfigurations.getDefaultConf(nonExistentPath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ HoodieScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext);
+
+ // Should return empty batch when table doesn't exist
+ HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ // Empty batch should have no splits
+ }
+
+ @Test
+ void testLazyMetaClientInitialization() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ HoodieScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext);
+
+ // First call should initialize metaClient and discover splits
+ HoodieContinuousSplitBatch result1 = discover.discoverSplits(null);
+ assertNotNull(result1, "First result should not be null");
+
+ // Second call should reuse the same metaClient
+ HoodieContinuousSplitBatch result2 = discover.discoverSplits(null);
+ assertNotNull(result2, "Second result should not be null");
+ }
+
+ @Test
+ void testDiscoverSplitsHandlesNullMetaClientGracefully() throws Exception {
+ // Use a directory that exists but is not a Hudi table
+ String emptyPath = basePath + "/empty_dir";
+ new File(emptyPath).mkdirs();
+
+ Configuration conf = TestConfigurations.getDefaultConf(emptyPath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ HoodieScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext);
+
+ // Should handle null metaClient gracefully and return empty batch
+ HoodieContinuousSplitBatch result =
discover.discoverSplits("20230101000000");
+
+ assertNotNull(result, "Result should not be null even with null
metaClient");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
// Helper methods
private HoodieScanContext createScanContext(Configuration conf) throws
Exception {