This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7f72d33b560 feat: support predicate push down in Hudi flink source v2 
(#18212)
b7f72d33b560 is described below

commit b7f72d33b560b745a3ce1733c08343149750ce63
Author: Peter Huang <[email protected]>
AuthorDate: Sun Feb 22 17:21:45 2026 -0800

    feat: support predicate push down in Hudi flink source v2 (#18212)
---
 .../apache/hudi/common/table/log/InstantRange.java |  34 +++-
 .../reader/function/HoodieSplitReaderFunction.java |  61 +++---
 .../source/split/HoodieContinuousSplitBatch.java   |   3 +-
 .../hudi/source/split/HoodieSourceSplit.java       |   7 +-
 .../source/split/HoodieSourceSplitSerializer.java  |  39 +++-
 .../org/apache/hudi/table/HoodieTableSource.java   |   7 +-
 .../org/apache/hudi/table/format/FormatUtils.java  |   1 +
 .../java/org/apache/hudi/util/FileIndexReader.java |   6 +-
 .../org/apache/hudi/source/TestHoodieSource.java   |   4 +-
 .../assign/TestDefaultHoodieSplitAssigner.java     |   3 +-
 .../assign/TestHoodieSplitBucketAssigner.java      |   3 +-
 .../assign/TestHoodieSplitNumberAssigner.java      |   3 +-
 .../TestHoodieContinuousSplitEnumerator.java       |   3 +-
 .../TestHoodieEnumeratorStateSerializer.java       |   5 +-
 .../TestHoodieStaticSplitEnumerator.java           |   3 +-
 .../source/reader/TestHoodieRecordEmitter.java     |   3 +-
 .../source/reader/TestHoodieSourceSplitReader.java |   3 +-
 .../function/TestHoodieSplitReaderFunction.java    | 141 ++++++++++---
 .../split/TestDefaultHoodieSplitProvider.java      |   9 +-
 .../hudi/source/split/TestHoodieSourceSplit.java   | 174 ++++++++++++++--
 .../split/TestHoodieSourceSplitComparator.java     |   3 +-
 .../split/TestHoodieSourceSplitSerializer.java     | 226 +++++++++++++++++++--
 22 files changed, 628 insertions(+), 113 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 6d6d29bb40f6..ff0b29ce1669 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -65,12 +65,14 @@ public abstract class InstantRange implements Serializable {
 
   public abstract boolean isInRange(String instant);
 
+  public abstract RangeType getRangeType();
+
   @Override
   public String toString() {
     return "InstantRange{"
         + "startInstant='" + (startInstant.isEmpty() ? "-INF" : 
startInstant.get()) + '\''
         + ", endInstant='" + (endInstant.isEmpty() ? "+INF" : 
endInstant.get()) + '\''
-        + ", rangeType='" + this.getClass().getSimpleName() + '\''
+        + ", rangeType='" + this.getRangeType().name() + '\''
         + '}';
   }
 
@@ -107,6 +109,11 @@ public abstract class InstantRange implements Serializable 
{
               .orElse(true);
       return validAgainstStart && validAgainstEnd;
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.OPEN_CLOSED;
+    }
   }
 
   private static class OpenClosedRangeNullableBoundary extends InstantRange {
@@ -128,6 +135,11 @@ public abstract class InstantRange implements Serializable 
{
 
       return validAgainstStart && validAgainstEnd;
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.OPEN_CLOSED;
+    }
   }
 
   private static class ClosedClosedRange extends InstantRange {
@@ -144,6 +156,11 @@ public abstract class InstantRange implements Serializable 
{
               .orElse(true);
       return validAgainstStart && validAgainstEnd;
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.CLOSED_CLOSED;
+    }
   }
 
   private static class ClosedClosedRangeNullableBoundary extends InstantRange {
@@ -164,6 +181,11 @@ public abstract class InstantRange implements Serializable 
{
               .orElse(true);
       return validAgainstStart && validAgainstEnd;
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.CLOSED_CLOSED;
+    }
   }
 
   /**
@@ -181,6 +203,11 @@ public abstract class InstantRange implements Serializable 
{
     public boolean isInRange(String instant) {
       return this.instants.contains(instant);
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.EXACT_MATCH;
+    }
   }
 
   /**
@@ -203,6 +230,11 @@ public abstract class InstantRange implements Serializable 
{
       }
       return false;
     }
+
+    @Override
+    public RangeType getRangeType() {
+      return RangeType.COMPOSITION;
+    }
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 0cb66a3b078c..9178a8ecc30c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -19,8 +19,6 @@
 package org.apache.hudi.source.reader.function;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -28,20 +26,24 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.reader.BatchRecords;
 import org.apache.hudi.source.reader.HoodieRecordWithPosition;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.table.data.RowData;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.FlinkWriteClients;
+
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.stream.Collectors;
 
 /**
@@ -51,8 +53,11 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   private final HoodieTableMetaClient metaClient;
   private final HoodieSchema tableSchema;
   private final HoodieSchema requiredSchema;
-  private final Option<InternalSchema> internalSchemaOption;
-  private final TypedProperties props;
+  private final Configuration configuration;
+  private final HoodieWriteConfig writeConfig;
+  private final String mergeType;
+  private final boolean emitDelete;
+  private final List<ExpressionPredicates.Predicate> predicates;
   private HoodieFileGroupReader<RowData> fileGroupReader;
 
   public HoodieSplitReaderFunction(
@@ -61,16 +66,19 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
       HoodieSchema tableSchema,
       HoodieSchema requiredSchema,
       String mergeType,
-      Option<InternalSchema> internalSchemaOption) {
+      List<ExpressionPredicates.Predicate> predicates,
+      boolean emitDelete) {
 
     ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be 
null");
     ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema 
can't be null");
     this.metaClient = metaClient;
     this.tableSchema = tableSchema;
     this.requiredSchema = requiredSchema;
-    this.internalSchemaOption = internalSchemaOption;
-    this.props = new TypedProperties();
-    this.props.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
+    this.configuration = configuration;
+    this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
+    this.predicates = predicates;
+    this.mergeType = mergeType;
+    this.emitDelete = emitDelete;
     this.fileGroupReader = null;
   }
 
@@ -112,23 +120,18 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
         ).orElse(Collections.emptyList())
     );
 
-    FlinkReaderContextFactory readerContextFactory = new 
FlinkReaderContextFactory(metaClient);
-
-    // Build the file group reader
-    HoodieFileGroupReader.Builder<RowData> builder = 
HoodieFileGroupReader.<RowData>newBuilder()
-        .withReaderContext(readerContextFactory.getContext())
-        .withHoodieTableMetaClient(metaClient)
-        .withFileSlice(fileSlice)
-        .withProps(props)
-        .withShouldUseRecordPosition(true)
-        .withDataSchema(tableSchema)
-        .withRequestedSchema(requiredSchema);
-
-
-    if (internalSchemaOption.isPresent()) {
-      builder.withInternalSchema(internalSchemaOption);
-    }
-
-    return builder.build();
+    return FormatUtils.createFileGroupReader(
+      metaClient,
+      writeConfig,
+      InternalSchemaManager.get(metaClient.getStorageConf(), metaClient),
+      fileSlice,
+      tableSchema,
+      requiredSchema,
+      split.getLatestCommit(),
+      mergeType,
+      emitDelete,
+      predicates,
+      split.getInstantRange()
+    );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 244c027782c5..0b07b2e84bfb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -68,7 +68,8 @@ public class HoodieContinuousSplitBatch {
             split.getLogPaths(), split.getTablePath(),
             resolvePartitionPath(split), split.getMergeType(),
             split.getLatestCommit(),
-            split.getFileId()
+            split.getFileId(),
+            split.getInstantRange()
         )
     ).collect(Collectors.toList());
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index 41c1b05f03a1..2ba5db8427fc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
 import lombok.EqualsAndHashCode;
@@ -57,6 +58,8 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
   private final String mergeType;
   // latest commit time
   private final String latestCommit;
+  // instant range
+  private final Option<InstantRange> instantRange;
   // file id of file splice
   @Setter
   protected String fileId;
@@ -76,7 +79,8 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
       String partitionPath,
       String mergeType,
       String latestCommit,
-      String fileId) {
+      String fileId,
+      Option<InstantRange> instantRange) {
     this.splitNum = splitNum;
     this.basePath = Option.ofNullable(basePath);
     this.logPaths = logPaths;
@@ -86,6 +90,7 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
     this.latestCommit = latestCommit;
     this.fileId = fileId;
     this.fileOffset = 0;
+    this.instantRange = instantRange;
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 36e53f916896..c5ce5d2d8bd7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.flink.annotation.Internal;
@@ -82,6 +83,21 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
       // Serialize fileOffset
       out.writeInt(obj.getFileOffset());
 
+      // Serialize instant range (Option<InstantRange>)
+      out.writeBoolean(obj.getInstantRange().isPresent());
+      if (obj.getInstantRange().isPresent()) {
+        InstantRange instantRange = obj.getInstantRange().get();
+        out.writeBoolean(instantRange.getStartInstant().isPresent());
+        if (instantRange.getStartInstant().isPresent()) {
+          out.writeUTF(instantRange.getStartInstant().get());
+        }
+        out.writeBoolean(instantRange.getEndInstant().isPresent());
+        if (instantRange.getEndInstant().isPresent()) {
+          out.writeUTF(instantRange.getEndInstant().get());
+        }
+        out.writeUTF(instantRange.getRangeType().name());
+      }
+
       out.flush();
       return baos.toByteArray();
     }
@@ -129,6 +145,26 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
       // Deserialize fileOffset
       int fileOffset = in.readInt();
 
+      // Deserialize instantRange (Option<InstantRange>)
+      Option<InstantRange> instantRangeOption;
+      if (in.readBoolean()) {
+        InstantRange.Builder builder = InstantRange.builder();
+        // Deserialize startInstant
+        if (in.readBoolean()) {
+          builder.startInstant(in.readUTF());
+        }
+
+        // Deserialize endInstant
+        if (in.readBoolean()) {
+          builder.endInstant(in.readUTF());
+        }
+
+        builder.rangeType(InstantRange.RangeType.valueOf(in.readUTF()));
+        instantRangeOption = Option.of(builder.build());
+      } else {
+        instantRangeOption = Option.empty();
+      }
+
       // Create HoodieSourceSplit object
       HoodieSourceSplit split = new HoodieSourceSplit(
           splitNum,
@@ -138,7 +174,8 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
           partitionPath,
           mergeType,
           latestCommit,
-          fileId);
+          fileId,
+          instantRangeOption);
 
       // Update position to restore consumed and fileOffset
       split.updatePosition(fileOffset, consumed);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5f898e73ef57..dda548682337 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -299,13 +299,18 @@ public class HoodieTableSource extends FileIndexReader 
implements
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
     HoodieScanContext context = createHoodieScanContext(rowType);
+
+    final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
+    boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
     HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
         metaClient,
         conf,
         tableSchema,
         HoodieSchemaConverter.convertToSchema(requiredRowType),
         conf.get(FlinkOptions.MERGE_TYPE),
-        Option.empty());
+        predicates,
+        emitDelete
+        );
     return new HoodieSource<>(context, splitReaderFunction, new 
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 797591426d6c..d62763ef64af 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -128,6 +128,7 @@ public class FormatUtils {
             predicates,
             metaClient.getTableConfig(),
             instantRangeOption);
+
     final TypedProperties typedProps = 
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
     typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
index 5efffaa2ab97..e1ae0973486a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
@@ -110,7 +110,8 @@ public abstract class FileIndexReader implements 
Serializable {
                 fileSlice.getPartitionPath(),
                 mergeType,
                 fileSlice.getLatestInstantTime(),
-                fileSlice.getFileId()))
+                fileSlice.getFileId(),
+                Option.empty()))
         .collect(Collectors.toList());
   }
 
@@ -146,7 +147,8 @@ public abstract class FileIndexReader implements 
Serializable {
               fileSlice.getPartitionPath(),
               mergeType,
               result.getRight(),
-              fileSlice.getFileId());
+              fileSlice.getFileId(),
+              Option.empty());
         })
         .collect(Collectors.toList());
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index e929b616e647..2054a33052e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.types.logical.RowType;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -414,7 +415,8 @@ public class TestHoodieSource {
             schema, // schema will be resolved from table
             schema, // required schema
         conf.get(FlinkOptions.MERGE_TYPE),
-        org.apache.hudi.common.util.Option.empty());
+        Collections.emptyList(),
+            false);
 
     return new HoodieSource<>(
         scanContext,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
index f606abf1b0b5..f3aa66da80ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
@@ -236,7 +236,8 @@ public class TestDefaultHoodieSplitAssigner {
         "/table/path/partition1",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
index 8626d587ca8f..b5488e158175 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
@@ -287,7 +287,8 @@ public class TestHoodieSplitBucketAssigner {
         "/table/path/partition1",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
index 5c8a8e3f5167..8cead120e91d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
@@ -185,7 +185,8 @@ public class TestHoodieSplitNumberAssigner {
         "/table/path/partition1",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index b411d0395e66..6adfa2bfaf66 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -262,7 +262,8 @@ public class TestHoodieContinuousSplitEnumerator {
         "/table/path/partition1",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
index f8320f4a4bbb..7ff61f9f8ef0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
@@ -456,7 +456,7 @@ public class TestHoodieEnumeratorStateSerializer {
     splitStates.add(new HoodieSourceSplitState(split1, 
HoodieSourceSplitStatus.ASSIGNED));
 
     HoodieSourceSplit split2 = new HoodieSourceSplit(2, null,
-        Option.of(Arrays.asList("log1", "log2")), "/table", "/p2", 
"payload_combine", "", "file2");
+        Option.of(Arrays.asList("log1", "log2")), "/table", "/p2", 
"payload_combine", "", "file2", Option.empty());
     splitStates.add(new HoodieSourceSplitState(split2, 
HoodieSourceSplitStatus.UNASSIGNED));
 
     HoodieSourceSplit split3 = createTestSplit(3, "file3", "/p3");
@@ -491,7 +491,8 @@ public class TestHoodieEnumeratorStateSerializer {
         partitionPath,
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
index 27dd9fb85572..981473bcfab8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
@@ -228,7 +228,8 @@ public class TestHoodieStaticSplitEnumerator {
         "/table/path/partition1",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
index 9ea807e92973..482c169e89f5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
@@ -208,7 +208,8 @@ public class TestHoodieRecordEmitter {
         "/test/partition",
         "read_optimized",
         "19700101000000000",
-        "file-1"
+        "file-1",
+            Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
index 108a1d38e826..09c60e6fca2e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -314,7 +314,8 @@ public class TestHoodieSourceSplitReader {
         "/test/partition",
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index 6b2ea6a114c5..e399236ba6b2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -18,15 +18,25 @@
 
 package org.apache.hudi.source.reader.function;
 
+import org.apache.flink.table.types.AtomicDataType;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hudi.utils.TestConfigurations;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -37,9 +47,13 @@ import static org.mockito.Mockito.when;
  * Test cases for {@link HoodieSplitReaderFunction}.
  */
 public class TestHoodieSplitReaderFunction {
+  @TempDir
+  File tempDir;
+
   private HoodieSchema tableSchema;
   private HoodieSchema requiredSchema;
   private HoodieTableMetaClient mockMetaClient;
+  private Configuration conf;
 
   @BeforeEach
   public void setUp() {
@@ -49,6 +63,7 @@ public class TestHoodieSplitReaderFunction {
     // Create mock schemas
     tableSchema = mock(HoodieSchema.class);
     requiredSchema = mock(HoodieSchema.class);
+    conf = TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
   }
 
   @Test
@@ -57,10 +72,12 @@ public class TestHoodieSplitReaderFunction {
     assertThrows(IllegalArgumentException.class, () -> {
       new HoodieSplitReaderFunction(
           mockMetaClient,
-          new Configuration(), null,  // null tableSchema should throw
+          conf,
+          null,  // null tableSchema should throw
           requiredSchema,
           "AVRO_PAYLOAD",
-          Option.empty()
+          Collections.emptyList(),
+              false
       );
     });
   }
@@ -71,11 +88,12 @@ public class TestHoodieSplitReaderFunction {
     assertThrows(IllegalArgumentException.class, () -> {
       new HoodieSplitReaderFunction(
           mockMetaClient,
-          new Configuration(),
+          conf,
           tableSchema,
           null,  // null requiredSchema should throw
           "AVRO_PAYLOAD",
-          Option.empty()
+          Collections.emptyList(),
+          false
       );
     });
   }
@@ -86,11 +104,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
 
     assertNotNull(function);
@@ -103,11 +122,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.of(internalSchema)
+            Collections.emptyList(),
+            false
         );
 
     assertNotNull(function);
@@ -118,11 +138,13 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
+
         );
 
     // Close should not throw exception even when fileGroupReader is null
@@ -142,11 +164,12 @@ public class TestHoodieSplitReaderFunction {
       HoodieSplitReaderFunction function =
           new HoodieSplitReaderFunction(
               mockMetaClient,
-              new Configuration(),
+              conf,
               tableSchema,
               requiredSchema,
               mergeType,
-              Option.empty()
+              Collections.emptyList(),
+              false
           );
 
       assertNotNull(function);
@@ -158,11 +181,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
 
     // Multiple close calls should not throw exception
@@ -178,11 +202,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             customTableSchema,
             customRequiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
 
     assertNotNull(function);
@@ -197,11 +222,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function1 =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.of(internalSchema1)
+            Collections.emptyList(),
+            false
         );
     assertNotNull(function1);
 
@@ -209,11 +235,12 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function2 =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.of(internalSchema2)
+            Collections.emptyList(),
+            false
         );
     assertNotNull(function2);
 
@@ -221,28 +248,29 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function3 =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
     assertNotNull(function3);
   }
 
   @Test
   public void testConfigurationIsStored() {
-    Configuration config = new Configuration();
-    config.setString("test.key", "test.value");
+    conf.setString("test.key", "test.value");
 
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            config,
+                conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
 
     assertNotNull(function);
@@ -254,11 +282,66 @@ public class TestHoodieSplitReaderFunction {
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
-            new Configuration(),
+            conf,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Collections.emptyList(),
+            false
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithEmitDeleteTrue() {
+    HoodieSplitReaderFunction function =
+        new HoodieSplitReaderFunction(
+            mockMetaClient,
+            conf,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Collections.emptyList(),
+            true
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithPredicatesAndEmitDelete() {
+    ExpressionPredicates.Predicate predicate = 
ExpressionPredicates.NotEquals.getInstance()
+            .bindFieldReference(new FieldReferenceExpression("status", new 
AtomicDataType(new VarCharType(true, 10)), 0, 0))
+            .bindValueLiteral(new ValueLiteralExpression("deleted"));
+
+    List<ExpressionPredicates.Predicate> predicates = 
Collections.singletonList(predicate);
+
+    HoodieSplitReaderFunction function =
+            new HoodieSplitReaderFunction(
+                    mockMetaClient,
+                    conf,
+                    tableSchema,
+                    requiredSchema,
+                    "AVRO_PAYLOAD",
+                    predicates,
+                    true
+            );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithEmitDeleteFalse() {
+    HoodieSplitReaderFunction function =
+        new HoodieSplitReaderFunction(
+            mockMetaClient,
+            conf,
             tableSchema,
             requiredSchema,
             "AVRO_PAYLOAD",
-            Option.empty()
+            Collections.emptyList(),
+            false
         );
 
     assertNotNull(function);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 560463cbc6dc..516df64976ef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -638,7 +638,8 @@ public class TestDefaultHoodieSplitProvider {
         "/table/path/partition1",
         "read_optimized",
         "20260126034717000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 
@@ -651,7 +652,8 @@ public class TestDefaultHoodieSplitProvider {
         "/table/path/partition1",
         "read_optimized",
         latestCommit,
-        "file" + splitNum
+        "file" + splitNum,
+        Option.empty()
     );
   }
 
@@ -664,7 +666,8 @@ public class TestDefaultHoodieSplitProvider {
         "/table/path/partition1",
         "read_optimized",
         "2026012603471700" + splitNum,
-        fileId
+        fileId,
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
index 6ca4e6d60691..6355aa508f9a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
@@ -28,6 +29,7 @@ import java.util.Collections;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -95,9 +97,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsWithDifferentBasePath() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, "base-path-1", Option.empty(), "/table", "/partition1",  
"read_optimized", "", "file1");
+        1, "base-path-1", Option.empty(), "/table", "/partition1",  
"read_optimized", "", "file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, "base-path-2", Option.empty(), "/table", "/partition1", 
"read_optimized", "", "file1");
+        1, "base-path-2", Option.empty(), "/table", "/partition1", 
"read_optimized", "", "file1", Option.empty());
 
     assertNotEquals(split1, split2);
   }
@@ -105,9 +107,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsWithDifferentLogPaths() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table", 
"/partition1", "payload_combine", "", "file1");
+        1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table", 
"/partition1", "payload_combine", "", "file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table", 
"/partition1",  "payload_combine", "", "file1");
+        1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table", 
"/partition1",  "payload_combine", "", "file1", Option.empty());
 
     assertNotEquals(split1, split2);
   }
@@ -115,9 +117,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsWithDifferentTablePath() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, "base-path", Option.empty(), "/table1", "/partition1",  
"read_optimized", "", "file1");
+        1, "base-path", Option.empty(), "/table1", "/partition1",  
"read_optimized", "", "file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, "base-path", Option.empty(), "/table2", "/partition1", 
"read_optimized", "","file1");
+        1, "base-path", Option.empty(), "/table2", "/partition1", 
"read_optimized", "","file1", Option.empty());
 
     assertNotEquals(split1, split2);
   }
@@ -125,9 +127,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsWithDifferentMergeType() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, "base-path", Option.empty(), "/table", "/partition1", 
"read_optimized", "","file1");
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"read_optimized", "","file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, "base-path", Option.empty(), "/table", "/partition1", 
"payload_combine", "", "file1");
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"payload_combine", "", "file1", Option.empty());
 
     assertNotEquals(split1, split2);
   }
@@ -228,7 +230,7 @@ public class TestHoodieSourceSplit {
 
     HoodieSourceSplit split = new HoodieSourceSplit(
         42, basePath, Option.of(Arrays.asList("log1", "log2")),
-        tablePath, partitionPath, mergeType, "", fileId);
+        tablePath, partitionPath, mergeType, "", fileId, Option.empty());
 
     assertTrue(split.getBasePath().isPresent());
     assertEquals(basePath, split.getBasePath().get());
@@ -267,7 +269,7 @@ public class TestHoodieSourceSplit {
   public void testToString() {
     HoodieSourceSplit split = new HoodieSourceSplit(
         1, "base-path", Option.of(Arrays.asList("log1")),
-        "/table", "/partition", "read_optimized", "", "file1");
+        "/table", "/partition", "read_optimized", "", "file1", Option.empty());
 
     String result = split.toString();
 
@@ -303,9 +305,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsWithNullBasePath() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, null, Option.empty(), "/table", "/partition","read_optimized", "", 
"file1");
+        1, null, Option.empty(), "/table", "/partition","read_optimized", "", 
"file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, null, Option.empty(), "/table", "/partition","read_optimized", "", 
"file1");
+        1, null, Option.empty(), "/table", "/partition","read_optimized", "", 
"file1", Option.empty());
 
     assertEquals(split1, split2);
   }
@@ -313,9 +315,9 @@ public class TestHoodieSourceSplit {
   @Test
   public void testEqualsOneNullBasePathOneNot() {
     HoodieSourceSplit split1 = new HoodieSourceSplit(
-        1, null, Option.empty(), "/table", "/partition", "read_optimized", "", 
"file1");
+        1, null, Option.empty(), "/table", "/partition", "read_optimized", "", 
"file1", Option.empty());
     HoodieSourceSplit split2 = new HoodieSourceSplit(
-        1, "base-path", Option.empty(), "/table", "/partition", 
"read_optimized", "","file1");
+        1, "base-path", Option.empty(), "/table", "/partition", 
"read_optimized", "","file1", Option.empty());
 
     assertNotEquals(split1, split2);
   }
@@ -332,7 +334,149 @@ public class TestHoodieSourceSplit {
         partitionPath,
         "read_optimized",
         "19700101000000000",
-        fileId
+        fileId,
+        Option.empty()
     );
   }
+
+  @Test
+  public void testInstantRangePresent() {
+    InstantRange instantRange = InstantRange.builder()
+        .startInstant("20230101000000000")
+        .endInstant("20230131235959999")
+        
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+        .build();
+
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.empty(),
+        "/table/path",
+        "/partition/path",
+        "read_optimized",
+        "19700101000000000",
+        "file-1",
+        Option.of(instantRange)
+    );
+
+    assertTrue(split.getInstantRange().isPresent());
+    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstant().get());
+    assertEquals("20230131235959999", 
split.getInstantRange().get().getEndInstant().get());
+  }
+
+  @Test
+  public void testInstantRangeEmpty() {
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.empty(),
+        "/table/path",
+        "/partition/path",
+        "read_optimized",
+        "19700101000000000",
+        "file-1",
+        Option.empty()
+    );
+
+    assertFalse(split.getInstantRange().isPresent());
+  }
+
+  @Test
+  public void testInstantRangeWithOnlyStart() {
+    InstantRange instantRange = InstantRange.builder()
+        .startInstant("20230101000000000")
+        
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+        .nullableBoundary(true)
+        .build();
+
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.of(Arrays.asList("log1", "log2")),
+        "/table/path",
+        "/partition/path",
+        "payload_combine",
+        "19700101000000000",
+        "file-1",
+        Option.of(instantRange)
+    );
+
+    assertTrue(split.getInstantRange().isPresent());
+    assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
+    assertFalse(split.getInstantRange().get().getEndInstant().isPresent());
+    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstant().get());
+  }
+
+  @Test
+  public void testEqualsWithDifferentInstantRange() {
+    InstantRange range1 = InstantRange.builder()
+        .startInstant("20230101000000000")
+        .endInstant("20230131235959999")
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+        .build();
+
+    InstantRange range2 = InstantRange.builder()
+        .startInstant("20230201000000000")
+        .endInstant("20230228235959999")
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+        .build();
+
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"read_optimized", "19700101000000000", "file1", Option.of(range1));
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"read_optimized", "19700101000000000", "file1", Option.of(range2));
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testToStringWithInstantRange() {
+    InstantRange instantRange = InstantRange.builder()
+        .startInstant("20230101000000000")
+        .endInstant("20230131235959999")
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+        .build();
+
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.of(Arrays.asList("log1")),
+        "/table",
+        "/partition",
+        "read_optimized",
+        "19700101000000000",
+        "file1",
+        Option.of(instantRange)
+    );
+
+    String result = split.toString();
+
+    assertNotNull(result);
+    assertTrue(result.contains("HoodieSourceSplit"));
+  }
+
+  @Test
+  public void testClosedClosedInstantRange() {
+    InstantRange instantRange = InstantRange.builder()
+        .startInstant("20230101000000000")
+        .endInstant("20230131235959999")
+        .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+        .build();
+
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.of(Arrays.asList("log1", "log2")),
+        "/table/path",
+        "/partition/path",
+        "payload_combine",
+        "19700101000000000",
+        "file-1",
+        Option.of(instantRange)
+    );
+
+    assertTrue(split.getInstantRange().isPresent());
+    assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
+    assertTrue(split.getInstantRange().get().getEndInstant().isPresent());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
index a3c840f936af..41916f4f73e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
@@ -267,7 +267,8 @@ public class TestHoodieSourceSplitComparator {
         "/partition/path",
         "read_optimized",
         latestCommit,
-        "file-1"
+        "file-1",
+        Option.empty()
     );
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 61ac23fa323f..253936858236 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
@@ -50,7 +51,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-123"
+        "file-123",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -81,7 +83,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "payload_combine",
         "19700101000000000",
-        "file-456"
+        "file-456",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -103,7 +106,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "payload_combine",
         "19700101000000000",
-        "file-789"
+        "file-789",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -127,7 +131,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-000"
+        "file-000",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -148,7 +153,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-111"
+        "file-111",
+        Option.empty()
     );
 
     // Update position to simulate consumed state
@@ -173,7 +179,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-222"
+        "file-222",
+        Option.empty()
     );
 
     // Consume multiple times
@@ -204,7 +211,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/year=2024/month=01/day=22",
         "payload_combine",
         "19700101000000000",
-        "complex-file-id-with-uuid-12345678"
+        "complex-file-id-with-uuid-12345678",
+        Option.empty()
     );
 
     original.updatePosition(10, 5000L);
@@ -239,7 +247,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-333"
+        "file-333",
+        Option.empty()
     );
 
     byte[] serialized1 = serializer.serialize(original);
@@ -259,7 +268,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "payload_combine",
         "19700101000000000",
-        "file-444"
+        "file-444",
+        Option.empty()
     );
 
     original.updatePosition(5, 200L);
@@ -273,9 +283,9 @@ public class TestHoodieSourceSplitSerializer {
 
   @Test
   public void testSerializeMultipleSplitsWithDifferentStates() throws 
IOException {
-    HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1", 
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000","f1");
-    HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2", 
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine", 
"19700101000000000","f2");
-    HoodieSourceSplit split3 = new HoodieSourceSplit(3, null, 
Option.of(Arrays.asList("log1", "log2", "log3")), "/t3", "/p3", 
"read_optimized", "19700101000000000","f3");
+    HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1", 
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000","f1", 
Option.empty());
+    HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2", 
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine", 
"19700101000000000","f2", Option.empty());
+    HoodieSourceSplit split3 = new HoodieSourceSplit(3, null, 
Option.of(Arrays.asList("log1", "log2", "log3")), "/t3", "/p3", 
"read_optimized", "19700101000000000","f3", Option.empty());
 
     split1.updatePosition(1, 10L);
     split2.consume();
@@ -304,7 +314,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-large"
+        "file-large",
+        Option.empty()
     );
 
     original.updatePosition(Integer.MAX_VALUE, Long.MAX_VALUE);
@@ -326,7 +337,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-zero"
+        "file-zero",
+        Option.empty()
     );
 
     original.updatePosition(0, 0L);
@@ -349,7 +361,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-negative"
+        "file-negative",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -373,7 +386,8 @@ public class TestHoodieSourceSplitSerializer {
         longString.toString(),
         "read_optimized",
         "19700101000000000",
-        longString.toString()
+        longString.toString(),
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -395,7 +409,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/with/\r\n/carriage/return",
         "read_optimized",
         "19700101000000000",
-        "file-id-with-unicode-字符-émojis-🎉"
+        "file-id-with-unicode-字符-émojis-🎉",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -423,7 +438,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "payload_combine",
         "19700101000000000",
-        "file-many-logs"
+        "file-many-logs",
+        Option.empty()
     );
 
     byte[] serialized = serializer.serialize(original);
@@ -443,7 +459,8 @@ public class TestHoodieSourceSplitSerializer {
         "/partition/path",
         "read_optimized",
         "19700101000000000",
-        "file-roundtrip"
+        "file-roundtrip",
+        Option.empty()
     );
 
     original.updatePosition(5, 100L);
@@ -457,5 +474,176 @@ public class TestHoodieSourceSplitSerializer {
 
     assertEquals(original, current);
   }
+
+  @Test
+  public void testSerializeWithInstantRangeStartAndEnd() throws IOException {
+    InstantRange instantRange =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230101000000000")
+            .endInstant("20230131235959999")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+            .build();
+
+    HoodieSourceSplit original = new HoodieSourceSplit(
+        1,
+        "base-path",
+        Option.empty(),
+        "/table/path",
+        "/partition/path",
+        "read_optimized",
+        "19700101000000000",
+        "file-123",
+        Option.of(instantRange)
+    );
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertNotNull(deserialized);
+    assertTrue(deserialized.getInstantRange().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+  }
+
+  @Test
+  public void testSerializeWithInstantRangeOnlyStart() throws IOException {
+    InstantRange instantRange =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230101000000000")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+            .nullableBoundary(true)
+            .build();
+
+    HoodieSourceSplit original = new HoodieSourceSplit(
+        2,
+        "base-path",
+        Option.of(Arrays.asList("log1")),
+        "/table/path",
+        "/partition/path",
+        "payload_combine",
+        "19700101000000000",
+        "file-456",
+        Option.of(instantRange)
+    );
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertNotNull(deserialized);
+    assertTrue(deserialized.getInstantRange().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+    
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+  }
+
+  @Test
+  public void testSerializeWithClosedClosedInstantRange() throws IOException {
+    InstantRange instantRange =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230101000000000")
+            .endInstant("20230131235959999")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.CLOSED_CLOSED)
+            .build();
+
+    HoodieSourceSplit original = new HoodieSourceSplit(
+        4,
+        "base-path",
+        Option.of(Arrays.asList("log1", "log2", "log3")),
+        "/table/path",
+        "/partition/path",
+        "payload_combine",
+        "19700101000000000",
+        "file-range",
+        Option.of(instantRange)
+    );
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertNotNull(deserialized);
+    assertTrue(deserialized.getInstantRange().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+  }
+
+  @Test
+  public void testSerializeWithInstantRangeAndConsumedState() throws 
IOException {
+    InstantRange instantRange =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230101000000000")
+            .endInstant("20230131235959999")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+            .build();
+
+    HoodieSourceSplit original = new HoodieSourceSplit(
+        5,
+        "base-path",
+        Option.empty(),
+        "/table/path",
+        "/partition/path",
+        "read_optimized",
+        "19700101000000000",
+        "file-consumed",
+        Option.of(instantRange)
+    );
+
+    original.updatePosition(10, 500L);
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertNotNull(deserialized);
+    assertTrue(deserialized.getInstantRange().isPresent());
+    assertEquals(10, deserialized.getFileOffset());
+    assertEquals(500L, deserialized.getConsumed());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+  }
+
+  @Test
+  public void testSerializeMultipleSplitsWithInstantRange() throws IOException 
{
+    InstantRange range1 =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230101000000000")
+            .endInstant("20230131235959999")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+            .build();
+
+    InstantRange range2 =
+        org.apache.hudi.common.table.log.InstantRange.builder()
+            .startInstant("20230201000000000")
+            
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.CLOSED_CLOSED)
+            .nullableBoundary(true)
+            .build();
+
+    HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1", 
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000", "f1", 
Option.of(range1));
+    HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2", 
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine", 
"19700101000000000", "f2", Option.of(range2));
+    HoodieSourceSplit split3 = new HoodieSourceSplit(3, null, 
Option.of(Arrays.asList("log1", "log2")), "/t3", "/p3", "read_optimized", 
"19700101000000000", "f3", Option.empty());
+
+    byte[] serialized1 = serializer.serialize(split1);
+    byte[] serialized2 = serializer.serialize(split2);
+    byte[] serialized3 = serializer.serialize(split3);
+
+    HoodieSourceSplit deserialized1 = 
serializer.deserialize(serializer.getVersion(), serialized1);
+    HoodieSourceSplit deserialized2 = 
serializer.deserialize(serializer.getVersion(), serialized2);
+    HoodieSourceSplit deserialized3 = 
serializer.deserialize(serializer.getVersion(), serialized3);
+
+    // Verify split1
+    assertTrue(deserialized1.getInstantRange().isPresent());
+    assertEquals("20230101000000000", 
deserialized1.getInstantRange().get().getStartInstant().get());
+    assertEquals("20230131235959999", 
deserialized1.getInstantRange().get().getEndInstant().get());
+
+    // Verify split2
+    assertTrue(deserialized2.getInstantRange().isPresent());
+    assertEquals("20230201000000000", 
deserialized2.getInstantRange().get().getStartInstant().get());
+    
assertFalse(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+
+    // Verify split3
+    assertFalse(deserialized3.getInstantRange().isPresent());
+  }
 }
 

Reply via email to