This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f15e1d060f96 feat: add Flink source reader function for cdc splits 
(#18361)
f15e1d060f96 is described below

commit f15e1d060f96b7d825803d3d4fa2b1e0aa9f46a3
Author: Peter Huang <[email protected]>
AuthorDate: Thu Mar 26 19:23:06 2026 -0700

    feat: add Flink source reader function for cdc splits (#18361)
---
 .../function/HoodieCdcSplitReaderFunction.java     | 1065 ++++++++++++++++++++
 .../hudi/source/split/HoodieCdcSourceSplit.java    |   58 ++
 .../source/split/HoodieContinuousSplitBatch.java   |   39 +-
 .../source/split/HoodieSourceSplitSerializer.java  |   43 +
 .../org/apache/hudi/table/HoodieTableSource.java   |   42 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |    2 +-
 .../table/format/mor/MergeOnReadTableState.java    |    6 +-
 .../apache/hudi/source/TestStreamReadOperator.java |    2 +-
 .../function/TestHoodieCdcSplitReaderFunction.java |  195 ++++
 .../source/split/TestHoodieCdcSourceSplit.java     |  182 ++++
 .../split/TestHoodieContinuousSplitBatch.java      |  362 +++++++
 .../split/TestHoodieSourceSplitSerializer.java     |  147 ++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |   17 +-
 13 files changed, 2131 insertions(+), 29 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
new file mode 100644
index 000000000000..dee8a95c124a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.source.ExpressionPredicates;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.RowDataProjection;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * CDC reader function for source V2. Reads CDC splits ({@link 
HoodieCdcSourceSplit}) and
+ * emits change-log {@link RowData} records tagged with the appropriate {@link 
RowKind}.
+ *
+ * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted 
for the
+ * {@link SplitReaderFunction} contract.
+ */
+@Slf4j
+public class HoodieCdcSplitReaderFunction implements 
SplitReaderFunction<RowData> {
+
+  private final org.apache.flink.configuration.Configuration conf;
+  private final InternalSchemaManager internalSchemaManager;
+  private final List<DataType> fieldTypes;
+  private final MergeOnReadTableState tableState;
+  private final boolean emitDelete;
+  private final List<ExpressionPredicates.Predicate> predicates;
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieWriteConfig writeConfig;
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+  private transient ClosableIterator<RowData> currentIterator;
+  // Fallback reader for non-CDC splits (e.g. snapshot reads when 
read.start-commit='earliest')
+  private transient HoodieSplitReaderFunction fallbackReaderFunction;
+
+  /**
+   * Creates a CDC split reader function.
+   *
+   * @param conf                  Flink configuration
+   * @param tableState            Merge on Read table state
+   * @param internalSchemaManager Schema-evolution manager
+   * @param fieldTypes            DataType list for all table fields (used for 
parquet reading)
+   * @param predicates            Predicates for push down
+   * @param emitDelete            Whether to emit delete
+   */
+  public HoodieCdcSplitReaderFunction(
+      org.apache.flink.configuration.Configuration conf,
+      MergeOnReadTableState tableState,
+      InternalSchemaManager internalSchemaManager,
+      List<DataType> fieldTypes,
+      List<ExpressionPredicates.Predicate> predicates,
+      boolean emitDelete) {
+    this.conf = conf;
+    this.tableState = tableState;
+    this.internalSchemaManager = internalSchemaManager;
+    this.fieldTypes = fieldTypes;
+    this.predicates = predicates;
+    this.emitDelete = emitDelete;
+  }
+
+  @Override
+  public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
+    if (!(split instanceof HoodieCdcSourceSplit)) {
+      // Non-CDC splits arrive when reading from 'earliest' with no prior CDC 
history
+      // (i.e. instantRange is empty → snapshot path). Fall back to the 
standard MOR reader
+      // which emits all records as INSERT rows, matching the expected 
snapshot behaviour.
+      return getFallbackReaderFunction().read(split);
+    }
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+
+    HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+    HoodieTableMetaClient client = getMetaClient();
+    HoodieWriteConfig wConfig = getWriteConfig();
+
+    ImageManager imageManager = new ImageManager(tableState.getRowType(), 
wConfig, this::getFileSliceIterator);
+
+    Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc 
=
+        cdcFileSplit -> createRecordIteratorSafe(
+            cdcSplit.getTablePath(),
+            cdcSplit.getMaxCompactionMemoryInBytes(),
+            cdcFileSplit,
+            mode,
+            imageManager,
+            client);
+
+    currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), 
imageManager, recordIteratorFunc);
+    BatchRecords<RowData> records = BatchRecords.forRecords(
+        split.splitId(), currentIterator, split.getFileOffset(), 
split.getConsumed());
+    records.seek(split.getConsumed());
+    return records;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (currentIterator != null) {
+      currentIterator.close();
+    }
+    if (fallbackReaderFunction != null) {
+      fallbackReaderFunction.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Internal helpers
+  // -------------------------------------------------------------------------
+
+  private HoodieSplitReaderFunction getFallbackReaderFunction() {
+    if (fallbackReaderFunction == null) {
+      fallbackReaderFunction = new HoodieSplitReaderFunction(
+          conf,
+          HoodieSchema.parse(tableState.getTableSchema()),
+          HoodieSchema.parse(tableState.getRequiredSchema()),
+          internalSchemaManager,
+          conf.get(FlinkOptions.MERGE_TYPE),
+          predicates,
+          emitDelete);
+    }
+    return fallbackReaderFunction;
+  }
+
+  private ClosableIterator<RowData> createRecordIteratorSafe(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager,
+      HoodieTableMetaClient client) {
+    try {
+      return createRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager, client);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create CDC record iterator for 
split: " + fileSplit, e);
+    }
+  }
+
+  private ClosableIterator<RowData> createRecordIterator(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager,
+      HoodieTableMetaClient client) throws IOException {
+
+    final HoodieSchema tableSchema = 
HoodieSchema.parse(tableState.getTableSchema());
+    final HoodieSchema requiredSchema = 
HoodieSchema.parse(tableState.getRequiredSchema());
+    switch (fileSplit.getCdcInferCase()) {
+      case BASE_FILE_INSERT: {
+        ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
+            "CDC file path should exist and be singleton for 
BASE_FILE_INSERT");
+        String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
+        return new AddBaseFileIterator(getBaseFileIterator(path));
+      }
+      case BASE_FILE_DELETE: {
+        ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+            "Before file slice should exist for BASE_FILE_DELETE");
+        FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
+        MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(tablePath, fileSlice, 
maxCompactionMemoryInBytes);
+        return new RemoveBaseFileIterator(tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+      }
+      case AS_IS: {
+        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+        HoodieSchema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+        switch (mode) {
+          case DATA_BEFORE_AFTER:
+            return new BeforeAfterImageIterator(
+                getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(), cdcSchema, fileSplit);
+          case DATA_BEFORE:
+            return new BeforeImageIterator(
+                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(),
+                maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
+          case OP_KEY_ONLY:
+            return new RecordKeyImageIterator(
+                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(),
+                maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
+          default:
+            throw new AssertionError("Unexpected CDC supplemental logging 
mode: " + mode);
+        }
+      }
+      case LOG_FILE: {
+        ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
+            "CDC file path should exist and be singleton for LOG_FILE");
+        String logFilePath = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
+        MergeOnReadInputSplit split = 
CdcInputFormat.singleLogFile2Split(tablePath, logFilePath, 
maxCompactionMemoryInBytes);
+        ClosableIterator<HoodieRecord<RowData>> recordIterator = 
getFileSliceHoodieRecordIterator(split);
+        return new DataLogFileIterator(
+            maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema, 
tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+            recordIterator, client, getWriteConfig());
+      }
+      case REPLACE_COMMIT: {
+        return new ReplaceCommitIterator(
+            conf, tablePath, tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
+            fileSplit, this::getFileSliceIterator);
+      }
+      default:
+        throw new AssertionError("Unexpected CDC file split infer case: " + 
fileSplit.getCdcInferCase());
+    }
+  }
+
+  /** Reads the full-schema before/after image for a file slice 
(emitDelete=false). */
+  private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
+    FileSlice fileSlice = buildFileSlice(split);
+    final HoodieSchema tableSchema = 
HoodieSchemaCache.intern(HoodieSchema.parse(tableState.getTableSchema()));
+    try {
+      HoodieFileGroupReader<RowData> reader = 
FormatUtils.createFileGroupReader(
+          getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+          tableSchema, tableSchema, split.getLatestCommit(),
+          FlinkOptions.REALTIME_PAYLOAD_COMBINE, false,
+          predicates, split.getInstantRange());
+      return reader.getClosableIterator();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create file slice iterator for 
split: " + split, e);
+    }
+  }
+
+  /** Reads a single log file and returns a typed {@link HoodieRecord} 
iterator (for LOG_FILE CDC inference). */
+  private ClosableIterator<HoodieRecord<RowData>> 
getFileSliceHoodieRecordIterator(MergeOnReadInputSplit split) {
+    FileSlice fileSlice = buildFileSlice(split);
+    final HoodieSchema tableSchema = 
HoodieSchemaCache.intern(HoodieSchema.parse(tableState.getTableSchema()));
+    try {
+      HoodieFileGroupReader<RowData> reader = 
FormatUtils.createFileGroupReader(
+          getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+          tableSchema, tableSchema, split.getLatestCommit(),
+          FlinkOptions.REALTIME_PAYLOAD_COMBINE, true,
+          predicates, split.getInstantRange());
+      return reader.getClosableHoodieRecordIterator();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create Hoodie record iterator for 
split: " + split, e);
+    }
+  }
+
+  /** Reads a parquet CDC base file returning required-schema records. */
+  private ClosableIterator<RowData> getBaseFileIterator(String path) throws 
IOException {
+    String[] fieldNames = tableState.getRowType().getFieldNames().toArray(new 
String[0]);
+    DataType[] fieldTypesArray = fieldTypes.toArray(new DataType[0]);
+    LinkedHashMap<String, Object> partObjects = 
FilePathUtils.generatePartitionSpecs(
+            path,
+            tableState.getRowType().getFieldNames(),
+            fieldTypes,
+            conf.get(FlinkOptions.PARTITION_DEFAULT_NAME),
+            conf.get(FlinkOptions.PARTITION_PATH_FIELD),
+            conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)
+    );
+
+    return RecordIterators.getParquetRecordIterator(
+        internalSchemaManager,
+        conf.get(FlinkOptions.READ_UTC_TIMEZONE),
+        true,
+        HadoopConfigurations.getParquetConf(conf, getHadoopConf()),
+        fieldNames,
+        fieldTypesArray,
+        partObjects,
+        tableState.getRequiredPositions(),
+        2048,
+        new org.apache.flink.core.fs.Path(path),
+        0,
+        Long.MAX_VALUE,
+        predicates);
+  }
+
+  private static FileSlice buildFileSlice(MergeOnReadInputSplit split) {
+    return new FileSlice(
+        new HoodieFileGroupId("", split.getFileId()),
+        "",
+        split.getBasePath().map(HoodieBaseFile::new).orElse(null),
+        split.getLogPaths()
+            .map(lp -> 
lp.stream().map(HoodieLogFile::new).collect(Collectors.toList()))
+            .orElse(Collections.emptyList()));
+  }
+
+  private static int[] computeRequiredPositions(RowType rowType, RowType 
requiredRowType) {
+    List<String> allNames = rowType.getFieldNames();
+    return requiredRowType.getFieldNames().stream()
+        .map(allNames::indexOf)
+        .mapToInt(i -> i)
+        .toArray();
+  }
+
+  private HoodieTableMetaClient getMetaClient() {
+    if (metaClient == null) {
+      metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
+    }
+    return metaClient;
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    if (writeConfig == null) {
+      writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+    }
+    return writeConfig;
+  }
+
+  private org.apache.hadoop.conf.Configuration getHadoopConf() {
+    if (hadoopConf == null) {
+      hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+    }
+    return hadoopConf;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner iterators (adapted from CdcInputFormat inner classes)
+  // -------------------------------------------------------------------------
+
+  /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating 
record reading to a factory. */
+  private static class CdcFileSplitsIterator implements 
ClosableIterator<RowData> {
+    private ImageManager imageManager;
+    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
+    private ClosableIterator<RowData> recordIterator;
+
+    CdcFileSplitsIterator(
+        HoodieCDCFileSplit[] changes,
+        ImageManager imageManager,
+        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
+      this.fileSplitIterator = Arrays.asList(changes).iterator();
+      this.imageManager = imageManager;
+      this.recordIteratorFunc = recordIteratorFunc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (recordIterator != null) {
+        if (recordIterator.hasNext()) {
+          return true;
+        } else {
+          recordIterator.close();
+          recordIterator = null;
+        }
+      }
+      if (fileSplitIterator.hasNext()) {
+        recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+        return recordIterator.hasNext();
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return recordIterator.next();
+    }
+
+    @Override
+    public void close() {
+      if (recordIterator != null) {
+        recordIterator.close();
+      }
+      if (imageManager != null) {
+        imageManager.close();
+        imageManager = null;
+      }
+    }
+  }
+
+  /** Wraps a base-file parquet iterator and marks every record as {@link 
RowKind#INSERT}. */
+  private static class AddBaseFileIterator implements 
ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
+    private RowData currentRecord;
+
+    AddBaseFileIterator(ClosableIterator<RowData> nested) {
+      this.nested = nested;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (nested.hasNext()) {
+        currentRecord = nested.next();
+        currentRecord.setRowKind(RowKind.INSERT);
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return currentRecord;
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  /** Wraps a file-slice iterator and marks every record as {@link 
RowKind#DELETE}, with projection. */
+  private static class RemoveBaseFileIterator implements 
ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
+    private final RowDataProjection projection;
+
+    RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions, 
ClosableIterator<RowData> iterator) {
+      this.nested = iterator;
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nested.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+      RowData row = nested.next();
+      row.setRowKind(RowKind.DELETE);
+      return projection.project(row);
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  /**
+   * Handles the {@code LOG_FILE} CDC inference case: compares records from 
the log file
+   * against before-image snapshots to emit 
INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
+   */
+  private static class DataLogFileIterator implements 
ClosableIterator<RowData> {
+    private final HoodieSchema tableSchema;
+    private final long maxCompactionMemoryInBytes;
+    private final ImageManager imageManager;
+    private final RowDataProjection projection;
+    private final BufferedRecordMerger recordMerger;
+    private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
+    private final DeleteContext deleteContext;
+    private final HoodieReaderContext<RowData> readerContext;
+    private final String[] orderingFields;
+    private final TypedProperties props;
+
+    private ExternalSpillableMap<String, byte[]> beforeImages;
+    private RowData currentImage;
+    private RowData sideImage;
+
+    DataLogFileIterator(
+        long maxCompactionMemoryInBytes,
+        ImageManager imageManager,
+        HoodieCDCFileSplit cdcFileSplit,
+        HoodieSchema tableSchema,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
+        HoodieTableMetaClient metaClient,
+        HoodieWriteConfig writeConfig) throws IOException {
+      this.tableSchema = tableSchema;
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.imageManager = imageManager;
+      this.projection = 
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
+              ? null : RowDataProjection.instance(requiredRowType, 
requiredPositions);
+      this.props = writeConfig.getProps();
+      this.readerContext = new 
FlinkReaderContextFactory(metaClient).getContext();
+      readerContext.initRecordMerger(props);
+      this.orderingFields = ConfigUtils.getOrderingFields(props);
+      this.recordMerger = BufferedRecordMergerFactory.create(
+          readerContext,
+          readerContext.getMergeMode(),
+          false,
+          Option.of(writeConfig.getRecordMerger()),
+          tableSchema,
+          
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), 
writeConfig.getPayloadClass())),
+          props,
+          metaClient.getTableConfig().getPartialUpdateMode());
+      this.logRecordIterator = logRecordIterator;
+      this.deleteContext = new DeleteContext(props, 
tableSchema).withReaderSchema(tableSchema);
+      initImages(cdcFileSplit, writeConfig);
+    }
+
+    private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig 
writeConfig) throws IOException {
+      if (fileSplit.getBeforeFileSlice().isPresent() && 
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+        this.beforeImages = this.imageManager.getOrLoadImages(
+            maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+      } else {
+        this.beforeImages = FormatUtils.spillableMap(writeConfig, 
maxCompactionMemoryInBytes, getClass().getSimpleName());
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (sideImage != null) {
+        currentImage = sideImage;
+        sideImage = null;
+        return true;
+      }
+      while (logRecordIterator.hasNext()) {
+        HoodieRecord<RowData> record = logRecordIterator.next();
+        RowData existed = 
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+        if (isDelete(record)) {
+          if (existed != null) {
+            existed.setRowKind(RowKind.DELETE);
+            currentImage = existed;
+            return true;
+          }
+        } else {
+          if (existed == null) {
+            RowData newRow = record.getData();
+            newRow.setRowKind(RowKind.INSERT);
+            currentImage = newRow;
+            return true;
+          } else {
+            HoodieOperation operation = 
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
+            HoodieRecord<RowData> historyRecord = new 
HoodieFlinkRecord(record.getKey(), operation, existed);
+            HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord, 
record).get();
+            if (merged.getData() != existed) {
+              existed.setRowKind(RowKind.UPDATE_BEFORE);
+              currentImage = existed;
+              RowData mergedRow = merged.getData();
+              mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+              imageManager.updateImageRecord(record.getRecordKey(), 
beforeImages, mergedRow);
+              sideImage = mergedRow;
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return projection != null ? projection.project(currentImage) : 
currentImage;
+    }
+
+    @Override
+    public void close() {
+      logRecordIterator.close();
+      imageManager.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private Option<HoodieRecord<RowData>> mergeRowWithLog(
+        HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
+      try {
+        BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
+            historyRecord, tableSchema, readerContext.getRecordContext(), 
props, orderingFields, deleteContext);
+        BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
+            newRecord, tableSchema, readerContext.getRecordContext(), props, 
orderingFields, deleteContext);
+        BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf, 
newBuf);
+        return Option.ofNullable(readerContext.getRecordContext()
+            .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
+      } catch (IOException e) {
+        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
+      }
+    }
+
+    private boolean isDelete(HoodieRecord<RowData> record) {
+      return record.isDelete(deleteContext, CollectionUtils.emptyProps());
+    }
+  }
+
+  /**
+   * Base iterator for CDC log files stored with supplemental logging (AS_IS 
inference case).
+   * Reads {@link HoodieCDCLogRecordIterator} and resolves before/after images 
using
+   * subclass-specific logic.
+   */
+  private abstract static class BaseImageIterator implements 
ClosableIterator<RowData> {
+    private final HoodieSchema requiredSchema;
+    private final int[] requiredPos;
+    private final GenericRecordBuilder recordBuilder;
+    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+    private HoodieCDCLogRecordIterator cdcItr;
+
+    private GenericRecord cdcRecord;
+    private RowData sideImage;
+    private RowData currentImage;
+
+    BaseImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit) {
+      this.requiredSchema = requiredSchema;
+      this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
+      this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
+      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
+
+      StoragePath hadoopTablePath = new StoragePath(tablePath);
+      HoodieStorage storage = HoodieStorageUtils.getStorage(
+          tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
+      HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
+          .map(cdcFile -> {
+            try {
+              return new HoodieLogFile(storage.getPathInfo(new 
StoragePath(hadoopTablePath, cdcFile)));
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to get file status for CDC 
log: " + cdcFile, e);
+            }
+          })
+          .toArray(HoodieLogFile[]::new);
+      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema);
+    }
+
+    private static int[] computeRequiredPos(HoodieSchema tableSchema, 
HoodieSchema requiredSchema) {
+      HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+      List<String> fields = dataSchema.getFields().stream()
+          .map(HoodieSchemaField::name)
+          .collect(Collectors.toList());
+      return requiredSchema.getFields().stream()
+          .map(f -> fields.indexOf(f.name()))
+          .mapToInt(i -> i)
+          .toArray();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (sideImage != null) {
+        currentImage = sideImage;
+        sideImage = null;
+        return true;
+      } else if (cdcItr.hasNext()) {
+        cdcRecord = (GenericRecord) cdcItr.next();
+        String op = String.valueOf(cdcRecord.get(0));
+        resolveImage(op);
+        return true;
+      }
+      return false;
+    }
+
+    protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    @Override
+    public RowData next() {
+      return currentImage;
+    }
+
+    @Override
+    public void close() {
+      if (cdcItr != null) {
+        cdcItr.close();
+        cdcItr = null;
+      }
+    }
+
+    private void resolveImage(String op) {
+      switch (op) {
+        case "i":
+          currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+          break;
+        case "u":
+          currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+          sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+          break;
+        case "d":
+          currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+          break;
+        default:
+          throw new AssertionError("Unexpected CDC operation: " + op);
+      }
+    }
+
+    protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+      GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+          avroRecord, requiredSchema, requiredPos, recordBuilder);
+      RowData resolved = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
+      resolved.setRowKind(rowKind);
+      return resolved;
+    }
+  }
+
+  /** Reads CDC log files that contain both before and after images ({@code 
DATA_BEFORE_AFTER} mode). */
+  private static class BeforeAfterImageIterator extends BaseImageIterator {
+    BeforeAfterImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit) {
+      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+  }
+
+  /**
+   * Reads CDC log files containing op + key + before_image ({@code 
DATA_BEFORE} mode).
+   * The after-image is loaded from the after file-slice via the {@link 
ImageManager}.
+   */
+  private static class BeforeImageIterator extends BaseImageIterator {
+    protected ExternalSpillableMap<String, byte[]> afterImages;
+    protected final long maxCompactionMemoryInBytes;
+    protected final RowDataProjection projection;
+    protected final ImageManager imageManager;
+
+    BeforeImageIterator(
+        org.apache.flink.configuration.Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        long maxCompactionMemoryInBytes,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        ImageManager imageManager) throws IOException {
+      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.projection = RowDataProjection.instance(requiredRowType,
+          computePositions(tableSchema, requiredRowType));
+      this.imageManager = imageManager;
+      initImages(fileSplit);
+    }
+
+    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
+      ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+          "Current file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.afterImages = imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = imageManager.getImageRecord(recordKey, afterImages, 
rowKind);
+      row.setRowKind(rowKind);
+      return projection.project(row);
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+
+    private static int[] computePositions(HoodieSchema tableSchema, RowType 
requiredRowType) {
+      List<String> allFields = tableSchema.getFields().stream()
+          .map(HoodieSchemaField::name)
+          .collect(Collectors.toList());
+      return requiredRowType.getFieldNames().stream()
+          .map(allFields::indexOf)
+          .mapToInt(i -> i)
+          .toArray();
+    }
+  }
+
+  /**
+   * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
+   * Both before and after images are loaded from file-slice snapshots via 
{@link ImageManager}.
+   */
+  private static class RecordKeyImageIterator extends BeforeImageIterator {
+    protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+    RecordKeyImageIterator(
+        org.apache.flink.configuration.Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        long maxCompactionMemoryInBytes,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        ImageManager imageManager) throws IOException {
+      super(flinkConf, hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType,
+          maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
+    }
+
+    @Override
+    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
+      super.initImages(fileSplit);
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.beforeImages = imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = imageManager.getImageRecord(recordKey, beforeImages, 
rowKind);
+      row.setRowKind(rowKind);
+      return projection.project(row);
+    }
+  }
+
+  /** Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records 
from before-slice as DELETE. */
+  private static class ReplaceCommitIterator implements 
ClosableIterator<RowData> {
+    private final ClosableIterator<RowData> itr;
+    private final RowDataProjection projection;
+
+    ReplaceCommitIterator(
+        org.apache.flink.configuration.Configuration flinkConf,
+        String tablePath,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        long maxCompactionMemoryInBytes,
+        HoodieCDCFileSplit fileSplit,
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+          tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
+      this.itr = splitIteratorFunc.apply(inputSplit);
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return itr.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+      RowData row = itr.next();
+      row.setRowKind(RowKind.DELETE);
+      return projection.project(row);
+    }
+
+    @Override
+    public void close() {
+      itr.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  ImageManager - caches full-schema row images keyed by record key
+  // -------------------------------------------------------------------------
+
+  /**
+   * Manages serialized before/after image snapshots for a file group, cached 
by instant time.
+   * At most two versions (before and after) are kept in memory; older entries 
are spilled to disk.
+   */
+  private static class ImageManager implements AutoCloseable {
+    private final HoodieWriteConfig writeConfig;
+    private final RowDataSerializer serializer;
+    private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
+    private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+    ImageManager(
+        RowType rowType,
+        HoodieWriteConfig writeConfig,
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
+      this.serializer = new RowDataSerializer(rowType);
+      this.writeConfig = writeConfig;
+      this.splitIteratorFunc = splitIteratorFunc;
+      this.cache = new TreeMap<>();
+    }
+
+    ExternalSpillableMap<String, byte[]> getOrLoadImages(
+        long maxCompactionMemoryInBytes, FileSlice fileSlice) throws 
IOException {
+      final String instant = fileSlice.getBaseInstantTime();
+      if (cache.containsKey(instant)) {
+        return cache.get(instant);
+      }
+      if (cache.size() > 1) {
+        String oldest = cache.keySet().iterator().next();
+        cache.remove(oldest).close();
+      }
+      ExternalSpillableMap<String, byte[]> images = 
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+      cache.put(instant, images);
+      return images;
+    }
+
+    private ExternalSpillableMap<String, byte[]> loadImageRecords(
+        long maxCompactionMemoryInBytes, FileSlice fileSlice) throws 
IOException {
+      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+          writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
+      ExternalSpillableMap<String, byte[]> imageRecordsMap =
+          FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, 
getClass().getSimpleName());
+      try (ClosableIterator<RowData> itr = 
splitIteratorFunc.apply(inputSplit)) {
+        while (itr.hasNext()) {
+          RowData row = itr.next();
+          String recordKey = 
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+          ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+          serializer.serialize(row, new BytesArrayOutputView(baos));
+          imageRecordsMap.put(recordKey, baos.toByteArray());
+        }
+      }
+      return imageRecordsMap;
+    }
+
+    RowData getImageRecord(
+        String recordKey, ExternalSpillableMap<String, byte[]> imageCache, 
RowKind rowKind) {
+      byte[] bytes = imageCache.get(recordKey);
+      ValidationUtils.checkState(bytes != null,
+          "Key " + recordKey + " does not exist in current file group image");
+      try {
+        RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+        row.setRowKind(rowKind);
+        return row;
+      } catch (IOException e) {
+        throw new HoodieException("Failed to deserialize image record for key: 
" + recordKey, e);
+      }
+    }
+
+    void updateImageRecord(
+        String recordKey, ExternalSpillableMap<String, byte[]> imageCache, 
RowData row) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+      try {
+        serializer.serialize(row, new BytesArrayOutputView(baos));
+      } catch (IOException e) {
+        throw new HoodieException("Failed to serialize image record for key: " 
+ recordKey, e);
+      }
+      imageCache.put(recordKey, baos.toByteArray());
+    }
+
+    RowData removeImageRecord(
+        String recordKey, ExternalSpillableMap<String, byte[]> imageCache) {
+      byte[] bytes = imageCache.remove(recordKey);
+      if (bytes == null) {
+        return null;
+      }
+      try {
+        return serializer.deserialize(new BytesArrayInputView(bytes));
+      } catch (IOException e) {
+        throw new HoodieException("Failed to deserialize image record for key: 
" + recordKey, e);
+      }
+    }
+
+    @Override
+    public void close() {
+      cache.values().forEach(ExternalSpillableMap::close);
+      cache.clear();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  I/O view adapters for RowDataSerializer
+  // -------------------------------------------------------------------------
+
+  private static final class BytesArrayInputView extends DataInputStream
+      implements org.apache.flink.core.memory.DataInputView {
+    BytesArrayInputView(byte[] data) {
+      super(new ByteArrayInputStream(data));
+    }
+
+    @Override
+    public void skipBytesToRead(int numBytes) throws IOException {
+      while (numBytes > 0) {
+        int skipped = skipBytes(numBytes);
+        numBytes -= skipped;
+      }
+    }
+  }
+
+  private static final class BytesArrayOutputView extends DataOutputStream
+      implements org.apache.flink.core.memory.DataOutputView {
+    BytesArrayOutputView(ByteArrayOutputStream baos) {
+      super(baos);
+    }
+
+    @Override
+    public void skipBytesToWrite(int numBytes) throws IOException {
+      for (int i = 0; i < numBytes; i++) {
+        write(0);
+      }
+    }
+
+    @Override
+    public void write(org.apache.flink.core.memory.DataInputView source, int 
numBytes) throws IOException {
+      byte[] buffer = new byte[numBytes];
+      source.readFully(buffer);
+      write(buffer);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
new file mode 100644
index 000000000000..356e2f7a4ebe
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import lombok.ToString;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.util.Option;
+
+import lombok.Getter;
+
+/**
+ * A {@link HoodieSourceSplit} for CDC (Change Data Capture) reads.
+ *
+ * <p>Extends the base split with an ordered list of {@link 
HoodieCDCFileSplit} entries
+ * that describe the CDC changes for a single file group, and the memory 
budget used
+ * for spillable image maps.
+ */
+@ToString
+public class HoodieCdcSourceSplit extends HoodieSourceSplit {
+  private static final long serialVersionUID = 1L;
+
+  /** Ordered CDC file splits for one file group, sorted by instant time. */
+  @Getter
+  private final HoodieCDCFileSplit[] changes;
+
+  /** Maximum memory in bytes available for compaction/merge operations. */
+  @Getter
+  private final long maxCompactionMemoryInBytes;
+
+  public HoodieCdcSourceSplit(
+      int splitNum,
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      String fileId,
+      HoodieCDCFileSplit[] changes,
+      String mergeType,
+      String lastCommit) {
+    super(splitNum, null, Option.empty(), tablePath, "", mergeType, 
lastCommit, fileId, Option.empty());
+    this.changes = changes;
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 0b07b2e84bfb..a33ba88bcb05 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -19,9 +19,11 @@
 package org.apache.hudi.source.split;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 
 import lombok.Getter;
@@ -61,17 +63,34 @@ public class HoodieContinuousSplitBatch {
   }
 
   public static HoodieContinuousSplitBatch 
fromResult(IncrementalInputSplits.Result result) {
-    List<HoodieSourceSplit> splits = 
result.getInputSplits().stream().map(split ->
-        new HoodieSourceSplit(
+    List<HoodieSourceSplit> splits = 
result.getInputSplits().stream().map(split -> {
+      if (split instanceof CdcInputSplit) {
+        CdcInputSplit cdcSplit = (CdcInputSplit) split;
+        HoodieCDCFileSplit[] changes = cdcSplit.getChanges();
+        // CdcInputSplit does not carry a latestCommit; derive it from the 
last (largest instant)
+        // CDC file split, falling back to the batch end instant when the 
array is empty.
+        String latestCommit = changes.length > 0
+            ? changes[changes.length - 1].getInstant()
+            : result.getEndInstant();
+        return (HoodieSourceSplit) new HoodieCdcSourceSplit(
             HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
-            split.getBasePath().orElse(null),
-            split.getLogPaths(), split.getTablePath(),
-            resolvePartitionPath(split), split.getMergeType(),
-            split.getLatestCommit(),
-            split.getFileId(),
-            split.getInstantRange()
-        )
-    ).collect(Collectors.toList());
+            cdcSplit.getTablePath(),
+            cdcSplit.getMaxCompactionMemoryInBytes(),
+            cdcSplit.getFileId(),
+            changes,
+            split.getMergeType(),
+            latestCommit);
+      }
+      return new HoodieSourceSplit(
+          HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
+          split.getBasePath().orElse(null),
+          split.getLogPaths(), split.getTablePath(),
+          resolvePartitionPath(split), split.getMergeType(),
+          split.getLatestCommit(),
+          split.getFileId(),
+          split.getInstantRange()
+      );
+    }).collect(Collectors.toList());
 
     return new HoodieContinuousSplitBatch(splits, result.getEndInstant(), 
result.getOffset());
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 98980e5697db..62b96a846627 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
@@ -29,6 +30,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -118,6 +121,25 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
         }
       }
 
+      // Serialize CDC-specific fields if this is a HoodieCdcSourceSplit
+      boolean isCdcSplit = obj instanceof HoodieCdcSourceSplit;
+      out.writeBoolean(isCdcSplit);
+      if (isCdcSplit) {
+        HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) obj;
+        out.writeLong(cdcSplit.getMaxCompactionMemoryInBytes());
+        HoodieCDCFileSplit[] changes = cdcSplit.getChanges();
+        out.writeInt(changes.length);
+        for (HoodieCDCFileSplit change : changes) {
+          ByteArrayOutputStream changeBaos = new ByteArrayOutputStream();
+          try (ObjectOutputStream oos = new ObjectOutputStream(changeBaos)) {
+            oos.writeObject(change);
+          }
+          byte[] changeBytes = changeBaos.toByteArray();
+          out.writeInt(changeBytes.length);
+          out.write(changeBytes);
+        }
+      }
+
       out.flush();
       return baos.toByteArray();
     }
@@ -204,6 +226,27 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
       }
 
       // Create HoodieSourceSplit object
+      boolean isCdcSplit = in.readBoolean();
+      if (isCdcSplit) {
+        long maxCompactionMemoryInBytes = in.readLong();
+        int changesCount = in.readInt();
+        HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[changesCount];
+        for (int i = 0; i < changesCount; i++) {
+          int changeLen = in.readInt();
+          byte[] changeBytes = new byte[changeLen];
+          in.readFully(changeBytes);
+          try (ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(changeBytes))) {
+            changes[i] = (HoodieCDCFileSplit) ois.readObject();
+          } catch (ClassNotFoundException e) {
+            throw new IOException("Failed to deserialize HoodieCDCFileSplit", 
e);
+          }
+        }
+        HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
+            splitNum, tablePath, maxCompactionMemoryInBytes, fileId, changes, 
mergeType, latestCommit);
+        cdcSplit.updatePosition(fileOffset, consumed);
+        return cdcSplit;
+      }
+
       HoodieSourceSplit split = new HoodieSourceSplit(
               splitNum,
               basePath,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 62852bce8875..5a406da5fbba 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -53,7 +53,10 @@ import org.apache.hudi.source.prune.PartitionBucketIdFunc;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
 import org.apache.hudi.source.reader.HoodieRecordEmitter;
+import org.apache.hudi.source.reader.function.HoodieCdcSplitReaderFunction;
 import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
 import org.apache.hudi.source.split.HoodieSourceSplitComparator;
 import 
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
 import 
org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
@@ -300,16 +303,33 @@ public class HoodieTableSource extends FileIndexReader 
implements
 
     HoodieScanContext context = createHoodieScanContext(rowType);
     final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
+    final SplitReaderFunction<RowData> splitReaderFunction;
+    final MergeOnReadTableState<HoodieSourceSplit> hoodieTableState = new 
MergeOnReadTableState(
+            rowType,
+            requiredRowType,
+            tableSchema.toString(),
+            HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
+            new ArrayList());
     boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
-    HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
-        conf,
-        tableSchema,
-        HoodieSchemaConverter.convertToSchema(requiredRowType),
-        internalSchemaManager,
-        conf.get(FlinkOptions.MERGE_TYPE),
-        predicates,
-        emitDelete
-        );
+    if (conf.get(FlinkOptions.CDC_ENABLED)) {
+      List<DataType> fieldTypes = rowDataType.getChildren();
+      splitReaderFunction = new HoodieCdcSplitReaderFunction(
+          conf,
+          hoodieTableState,
+          internalSchemaManager,
+          fieldTypes,
+          predicates,
+          emitDelete);
+    } else {
+      splitReaderFunction = new HoodieSplitReaderFunction(
+          conf,
+          tableSchema,
+          HoodieSchemaConverter.convertToSchema(requiredRowType),
+          internalSchemaManager,
+          conf.get(FlinkOptions.MERGE_TYPE),
+          predicates,
+          emitDelete);
+    }
     return new HoodieSource<>(context, splitReaderFunction, new 
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
   }
 
@@ -585,7 +605,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
       HoodieSchema tableSchema,
       DataType rowDataType,
       List<MergeOnReadInputSplit> inputSplits) {
-    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+    final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new 
MergeOnReadTableState(
         rowType,
         requiredRowType,
         tableSchema.toString(),
@@ -610,7 +630,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
       DataType rowDataType,
       List<MergeOnReadInputSplit> inputSplits,
       boolean emitDelete) {
-    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+    final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new 
MergeOnReadTableState(
         rowType,
         requiredRowType,
         tableAvroSchema.toString(),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index f53998c078a7..dcf2160a26e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -71,7 +71,7 @@ public class MergeOnReadInputFormat
 
   protected transient org.apache.hadoop.conf.Configuration hadoopConf;
 
-  protected final MergeOnReadTableState tableState;
+  protected final MergeOnReadTableState<MergeOnReadInputSplit> tableState;
 
   /**
    * Uniform iterator view for the underneath records.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 868a7f814aba..bc7afa7efad1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -30,7 +30,7 @@ import java.util.List;
  * Statistics for merge on read table source.
  */
 @Getter
-public class MergeOnReadTableState implements Serializable {
+public class MergeOnReadTableState<T> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -38,7 +38,7 @@ public class MergeOnReadTableState implements Serializable {
   private final RowType requiredRowType;
   private final String tableSchema;
   private final String requiredSchema;
-  private final List<MergeOnReadInputSplit> inputSplits;
+  private final List<T> inputSplits;
   private final int operationPos;
 
   public MergeOnReadTableState(
@@ -46,7 +46,7 @@ public class MergeOnReadTableState implements Serializable {
       RowType requiredRowType,
       String tableSchema,
       String requiredSchema,
-      List<MergeOnReadInputSplit> inputSplits) {
+      List<T> inputSplits) {
     this.rowType = rowType;
     this.requiredRowType = requiredRowType;
     this.tableSchema = tableSchema;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index e540cd55b006..64f48374c1d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -255,7 +255,7 @@ public class TestStreamReadOperator {
     }
     final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
-    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+    final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new 
MergeOnReadTableState(
         rowType,
         TestConfigurations.ROW_TYPE,
         tableSchema.toString(),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
new file mode 100644
index 000000000000..d29a637664ae
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE;
+import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE;
+import static org.apache.hudi.utils.TestConfigurations.TABLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test cases for {@link HoodieCdcSplitReaderFunction}.
+ */
+public class TestHoodieCdcSplitReaderFunction {
+
+  @TempDir
+  File tempDir;
+
+  private Configuration conf;
+  private HoodieSchema tableSchema;
+  private HoodieSchema requiredSchema;
+  private InternalSchemaManager internalSchemaManager;
+  private MergeOnReadTableState tableState;
+
+  @BeforeEach
+  public void setUp() {
+    conf = TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+    tableSchema = mock(HoodieSchema.class);
+    requiredSchema = mock(HoodieSchema.class);
+    internalSchemaManager = mock(InternalSchemaManager.class);
+    tableState = new MergeOnReadTableState(ROW_TYPE, ROW_TYPE, 
TABLE_SCHEMA.toString(), TABLE_SCHEMA.toString(), new ArrayList<>());
+  }
+
+  private HoodieCdcSplitReaderFunction createFunction() {
+    return new HoodieCdcSplitReaderFunction(
+        conf,
+        tableState,
+        internalSchemaManager,
+        ROW_DATA_TYPE.getChildren(),
+        Collections.emptyList(),
+            false);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Constructor tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testConstructorWithValidParameters() {
+    HoodieCdcSplitReaderFunction function = createFunction();
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithProjectedRequiredRowType() {
+    // requiredRowType is a subset of rowType (first 3 of 5 fields)
+    RowType projectedRowType = new RowType(
+        ROW_TYPE.getFields().subList(0, 3).stream()
+            .map(f -> new RowType.RowField(f.getName(), f.getType()))
+            .collect(java.util.stream.Collectors.toList()));
+
+    tableState = new MergeOnReadTableState(ROW_TYPE, projectedRowType, 
TABLE_SCHEMA.toString(), 
HoodieSchemaConverter.convertToSchema(projectedRowType.copy()).toString(), new 
ArrayList<>());
+    HoodieCdcSplitReaderFunction function = new HoodieCdcSplitReaderFunction(
+        conf,
+        tableState,
+        internalSchemaManager,
+        ROW_DATA_TYPE.getChildren(),
+        Collections.emptyList(),
+            false);
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithEmptyFieldTypes() {
+    HoodieCdcSplitReaderFunction function = new HoodieCdcSplitReaderFunction(
+        conf,
+        tableState,
+        internalSchemaManager,
+        Collections.emptyList(),
+        Collections.emptyList(),
+            false);
+
+    assertNotNull(function);
+  }
+
+  // -------------------------------------------------------------------------
+  //  close() tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testCloseWithoutReadingDoesNotThrow() throws Exception {
+    HoodieCdcSplitReaderFunction function = createFunction();
+    function.close();
+  }
+
+  @Test
+  public void testMultipleClosesDoNotThrow() throws Exception {
+    HoodieCdcSplitReaderFunction function = createFunction();
+    function.close();
+    function.close();
+    function.close();
+  }
+
+  // -------------------------------------------------------------------------
+  //  read() argument-validation tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testReadWithNonCdcSplitDelegatesToFallback() {
+    // Non-CDC splits are forwarded to the fallback HoodieSplitReaderFunction 
rather than
+    // rejected. The fallback will attempt real I/O and fail, but the failure 
must NOT be
+    // an IllegalArgumentException (that would indicate the type-guard wrongly 
rejected it).
+    HoodieCdcSplitReaderFunction function = createFunction();
+
+    HoodieSourceSplit nonCdcSplit = new HoodieSourceSplit(
+        1, "base.parquet", Option.empty(), tempDir.getAbsolutePath(),
+        "", "read_optimized", "20230101000000000", "file-1", Option.empty());
+
+    Exception ex = assertThrows(Exception.class, () -> 
function.read(nonCdcSplit));
+    assertNotNull(ex);
+    // Must not be IllegalArgumentException (which the old type-guard wrongly 
threw)
+    if (ex instanceof IllegalArgumentException) {
+      throw new AssertionError("read() should not throw 
IllegalArgumentException for non-CDC split; "
+          + "it should fall through to the fallback reader", ex);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Integration: read() accepts a HoodieCdcSourceSplit (validation only)
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testReadAcceptsCdcSourceSplitType() {
+    // Verify that HoodieCdcSourceSplit is accepted (cast doesn't throw).
+    // Actual I/O would require a real Hoodie table, so we only check the
+    // type-guard passes by catching the downstream I/O error rather than
+    // an IllegalArgumentException.
+    HoodieCdcSplitReaderFunction function = createFunction();
+
+    HoodieCDCFileSplit[] changes = {
+        new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
+    };
+    HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
+        1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
+            changes, "read_optimized", "20230101000000000");
+
+    // The call should NOT throw IllegalArgumentException (type guard passes).
+    // It will throw some other exception when trying to do real I/O.
+    Exception ex = assertThrows(Exception.class, () -> 
function.read(cdcSplit));
+    assertNotNull(ex);
+    // Must not be an IllegalArgumentException (which the type guard throws)
+    if (ex instanceof IllegalArgumentException) {
+      throw new AssertionError("read() should not throw 
IllegalArgumentException for a CdcSourceSplit", ex);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
new file mode 100644
index 000000000000..56e9ae53ce72
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCdcSourceSplit}.
+ */
+public class TestHoodieCdcSourceSplit {
+
+  private static HoodieCDCFileSplit makeFileSplit(String instant) {
+    return new HoodieCDCFileSplit(instant, 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "cdc.parquet");
+  }
+
+  @Test
+  public void testGetChangesReturnsConstructorValue() {
+    HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000"), 
makeFileSplit("20230102000000000")};
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            changes, "read_optimized", "20230101000000000");
+
+    assertArrayEquals(changes, split.getChanges());
+    assertSame(changes, split.getChanges());
+  }
+
+  @Test
+  public void testGetMaxCompactionMemoryInBytes() {
+    long memory = 256 * 1024 * 1024L;
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory, 
"file-1", new HoodieCDCFileSplit[0],
+            "read_optimized", "20230101000000000");
+
+    assertEquals(memory, split.getMaxCompactionMemoryInBytes());
+  }
+
+  @Test
+  public void testGetTablePath() {
+    String tablePath = "/warehouse/hudi/my_table";
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L,
+            "file-1", new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
+
+    assertEquals(tablePath, split.getTablePath());
+  }
+
+  @Test
+  public void testGetFileId() {
+    String fileId = "my-file-id-abc-123";
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
+            fileId, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
+
+    assertEquals(fileId, split.getFileId());
+  }
+
+  @Test
+  public void testSplitIdFormat() {
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    // splitId() is inherited from HoodieSourceSplit and returns 
"splitNum:fileId"
+    assertEquals("1:file-1", split.splitId());
+  }
+
+  @Test
+  public void testToStringContainsExpectedFields() {
+    HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000")};
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-xyz",
+            changes, "read_optimized", "20230101000000000");
+
+    String str = split.toString();
+    assertNotNull(str);
+    // Lombok @ToString uses the actual class name "HoodieCdcSourceSplit"
+    assertTrue(str.contains("HoodieCdcSourceSplit"), "toString should mention 
class name");
+    // CDC-specific fields are included
+    assertTrue(str.contains("changes"), "toString should mention changes");
+    assertTrue(str.contains("maxCompactionMemoryInBytes"), "toString should 
mention maxCompactionMemoryInBytes");
+  }
+
+  @Test
+  public void testEmptyChangesArray() {
+    HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[0];
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(5, "/table", 512L, 
"file-empty",
+            changes, "read_optimized", "20230101000000000");
+
+    assertNotNull(split.getChanges());
+    assertEquals(0, split.getChanges().length);
+  }
+
+  @Test
+  public void testIsInstanceOfHoodieSourceSplit() {
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    assertTrue(split instanceof HoodieSourceSplit);
+  }
+
+  @Test
+  public void testLargeCompactionMemory() {
+    long largeMemory = Long.MAX_VALUE;
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 
largeMemory, "file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    assertEquals(largeMemory, split.getMaxCompactionMemoryInBytes());
+  }
+
+  @Test
+  public void testDefaultConsumedAndFileOffset() {
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    assertEquals(0L, split.getConsumed());
+    assertEquals(0, split.getFileOffset());
+    assertFalse(split.isConsumed());
+  }
+
+  @Test
+  public void testConsumeIncrementsConsumed() {
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    assertFalse(split.isConsumed());
+    split.consume();
+    assertTrue(split.isConsumed());
+    assertEquals(1L, split.getConsumed());
+    split.consume();
+    split.consume();
+    assertEquals(3L, split.getConsumed());
+  }
+
+  @Test
+  public void testUpdatePositionSetsConsumedAndFileOffset() {
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
+            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+    split.updatePosition(7, 42L);
+    assertEquals(7, split.getFileOffset());
+    assertEquals(42L, split.getConsumed());
+    assertTrue(split.isConsumed());
+  }
+
+  @Test
+  public void testMultipleChangesWithDifferentInferenceCases() {
+    HoodieCDCFileSplit insert = new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet");
+    HoodieCDCFileSplit delete = new HoodieCDCFileSplit("20230102000000000", 
HoodieCDCInferenceCase.BASE_FILE_DELETE, "delete.log");
+    HoodieCDCFileSplit logFile = new HoodieCDCFileSplit("20230103000000000", 
HoodieCDCInferenceCase.LOG_FILE, "log.log");
+    HoodieCDCFileSplit replace = new HoodieCDCFileSplit("20230104000000000", 
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet");
+
+    HoodieCDCFileSplit[] changes = {insert, delete, logFile, replace};
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-multi",
+            changes, "read_optimized", "20230101000000000");
+
+    assertEquals(4, split.getChanges().length);
+    assertSame(insert, split.getChanges()[0]);
+    assertSame(delete, split.getChanges()[1]);
+    assertSame(logFile, split.getChanges()[2]);
+    assertSame(replace, split.getChanges()[3]);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
new file mode 100644
index 000000000000..25e204817d07
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.IncrementalInputSplits;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieContinuousSplitBatch}.
+ */
+public class TestHoodieContinuousSplitBatch {
+
+  private static final String TABLE_PATH = "/warehouse/test_table";
+  private static final long MAX_MEMORY = 128 * 1024 * 1024L;
+
+  // -------------------------------------------------------------------------
+  //  fromResult tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testFromResultWithEmptyResult() {
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.emptyList(), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertNotNull(batch);
+    assertTrue(batch.getSplits().isEmpty());
+    assertEquals("20230101000000000", batch.getEndInstant());
+    assertNull(batch.getOffset());
+  }
+
+  @Test
+  public void testFromResultWithCdcSplitProducesHoodieCdcSourceSplit() {
+    HoodieCDCFileSplit[] changes = {
+        new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
+    };
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit), 
"20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(1, batch.getSplits().size());
+    HoodieSourceSplit split = batch.getSplits().iterator().next();
+    assertTrue(split instanceof HoodieCdcSourceSplit,
+        "CDC input split should produce HoodieCdcSourceSplit, got: " + 
split.getClass());
+
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+    assertEquals(TABLE_PATH, cdcSplit.getTablePath());
+    assertEquals(MAX_MEMORY, cdcSplit.getMaxCompactionMemoryInBytes());
+    assertEquals("file-cdc", cdcSplit.getFileId());
+    assertEquals(1, cdcSplit.getChanges().length);
+  }
+
+  @Test
+  public void testFromResultWithRegularSplitProducesHoodieSourceSplit() {
+    MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
+        1,
+        TABLE_PATH + "/base.parquet",
+        Option.of(Collections.singletonList(TABLE_PATH + "/log1.log")),
+        "20230101000000000",
+        TABLE_PATH,
+        MAX_MEMORY,
+        "payload_combine",
+        null,
+        "file-mor");
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.singletonList(morSplit), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(1, batch.getSplits().size());
+    HoodieSourceSplit split = batch.getSplits().iterator().next();
+    // Should be a plain HoodieSourceSplit (not a CDC one)
+    assertTrue(split instanceof HoodieSourceSplit);
+    assertTrue(!(split instanceof HoodieCdcSourceSplit),
+        "MOR split should produce HoodieSourceSplit, not 
HoodieCdcSourceSplit");
+    assertEquals("file-mor", split.getFileId());
+    assertEquals(TABLE_PATH, split.getTablePath());
+  }
+
+  @Test
+  public void testFromResultWithMixedSplitsPreservesOrder() {
+    HoodieCDCFileSplit[] changes = {
+        new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.LOG_FILE, "cdc.log")
+    };
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+
+    MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
+        2,
+        TABLE_PATH + "/base.parquet",
+        Option.empty(),
+        "20230101000000000",
+        TABLE_PATH,
+        MAX_MEMORY,
+        "payload_combine",
+        null,
+        "file-mor");
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Arrays.asList(cdcInputSplit, morSplit), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(2, batch.getSplits().size());
+    List<HoodieSourceSplit> splits = (List<HoodieSourceSplit>) 
batch.getSplits();
+    assertTrue(splits.get(0) instanceof HoodieCdcSourceSplit);
+    assertTrue(!(splits.get(1) instanceof HoodieCdcSourceSplit));
+    assertEquals("file-cdc", splits.get(0).getFileId());
+    assertEquals("file-mor", splits.get(1).getFileId());
+  }
+
+  @Test
+  public void testFromResultPreservesEndInstant() {
+    String endInstant = "20230215120000000";
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.emptyList(), endInstant);
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(endInstant, batch.getEndInstant());
+  }
+
+  @Test
+  public void testFromResultPreservesOffset() {
+    String offset = "20230215120000000";
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.emptyList(), "20230215120000000", offset);
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(offset, batch.getOffset());
+  }
+
+  @Test
+  public void testFromResultWithNullOffsetPreservesNullOffset() {
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.emptyList(), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertNull(batch.getOffset());
+  }
+
+  @Test
+  public void testFromResultCdcSplitPreservesChangesArray() {
+    HoodieCDCFileSplit split1 = new HoodieCDCFileSplit(
+        "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, 
"insert.parquet");
+    HoodieCDCFileSplit split2 = new HoodieCDCFileSplit(
+        "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE, 
"delete.log");
+    HoodieCDCFileSplit[] changes = {split1, split2};
+
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit), 
"20230102000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) 
batch.getSplits().iterator().next();
+    assertEquals(2, cdcSplit.getChanges().length);
+    assertSame(split1, cdcSplit.getChanges()[0]);
+    assertSame(split2, cdcSplit.getChanges()[1]);
+  }
+
+  @Test
+  public void testFromResultWithMultipleCdcSplits() {
+    HoodieCDCFileSplit[] changes1 = {
+        new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert1.parquet")
+    };
+    HoodieCDCFileSplit[] changes2 = {
+        new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet")
+    };
+
+    CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc-1", changes1);
+    CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY, 
"file-cdc-2", changes2);
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Arrays.<MergeOnReadInputSplit>asList(cdc1, cdc2), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(2, batch.getSplits().size());
+    for (HoodieSourceSplit s : batch.getSplits()) {
+      assertTrue(s instanceof HoodieCdcSourceSplit);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Constructor and EMPTY tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testEmptyConstant() {
+    HoodieContinuousSplitBatch empty = HoodieContinuousSplitBatch.EMPTY;
+
+    assertNotNull(empty);
+    assertTrue(empty.getSplits().isEmpty());
+    assertEquals("", empty.getEndInstant());
+    assertEquals("", empty.getOffset());
+  }
+
+  @Test
+  public void testConstructorStoresFields() {
+    Collection<HoodieSourceSplit> splits = Collections.singletonList(
+        new HoodieSourceSplit(1, null, Option.empty(), TABLE_PATH, "", 
"read_optimized", "", "file-1", Option.empty()));
+    String endInstant = "20230215000000000";
+    String offset = "some-offset";
+
+    HoodieContinuousSplitBatch batch = new HoodieContinuousSplitBatch(splits, 
endInstant, offset);
+
+    assertSame(splits, batch.getSplits());
+    assertEquals(endInstant, batch.getEndInstant());
+    assertEquals(offset, batch.getOffset());
+  }
+
+  @Test
+  public void testConstructorRejectsNullSplits() {
+    org.junit.jupiter.api.Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new HoodieContinuousSplitBatch(null, "20230101000000000", null));
+  }
+
+  @Test
+  public void testConstructorRejectsNullEndInstant() {
+    org.junit.jupiter.api.Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new HoodieContinuousSplitBatch(Collections.emptyList(), null, 
null));
+  }
+
+  // -------------------------------------------------------------------------
+  //  resolvePartitionPath coverage tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testFromResultMorSplitWithLogPathsOnlyDerivesPartitionPath() {
+    // Split has no base file, only log paths; partition path should be 
derived from the first log path
+    MergeOnReadInputSplit logOnlySplit = new MergeOnReadInputSplit(
+        1,
+        null,
+        Option.of(Collections.singletonList(TABLE_PATH + "/2023/01/log.log")),
+        "20230101000000000",
+        TABLE_PATH,
+        MAX_MEMORY,
+        "payload_combine",
+        null,
+        "file-log-only");
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.singletonList(logOnlySplit), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(1, batch.getSplits().size());
+    HoodieSourceSplit split = batch.getSplits().iterator().next();
+    // Partition path is derived relative to the table path
+    assertEquals("2023/01", split.getPartitionPath());
+  }
+
+  @Test
+  public void testFromResultMorSplitWithNoFilesHasEmptyPartitionPath() {
+    // Split has neither base file nor log paths; partition path must be empty 
string
+    MergeOnReadInputSplit emptyPathSplit = new MergeOnReadInputSplit(
+        1,
+        null,
+        Option.empty(),
+        "20230101000000000",
+        TABLE_PATH,
+        MAX_MEMORY,
+        "read_optimized",
+        null,
+        "file-no-paths");
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.singletonList(emptyPathSplit), "20230101000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(1, batch.getSplits().size());
+    HoodieSourceSplit split = batch.getSplits().iterator().next();
+    assertEquals("", split.getPartitionPath());
+  }
+
+  @Test
+  public void 
testFromResultCdcSplitWithEmptyChangesUsesEndInstantAsLatestCommit() {
+    // When a CDC split has no changes, the batch end instant is used as the 
latest commit
+    CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH, 
MAX_MEMORY, "file-cdc-empty",
+        new HoodieCDCFileSplit[0]);
+    String endInstant = "20230301000000000";
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.<MergeOnReadInputSplit>singletonList(cdcWithNoChanges), 
endInstant);
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    assertEquals(1, batch.getSplits().size());
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) 
batch.getSplits().iterator().next();
+    assertEquals(0, cdcSplit.getChanges().length);
+    // With no changes, latestCommit must fall back to the batch end instant
+    assertEquals(endInstant, cdcSplit.getLatestCommit());
+  }
+
+  @Test
+  public void testFromResultMorSplitWithSubdirectoryPartitionPath() {
+    // Verifies multi-level partition paths (e.g. year=2023/month=01/day=15) 
are resolved correctly
+    MergeOnReadInputSplit partitionedSplit = new MergeOnReadInputSplit(
+        1,
+        TABLE_PATH + "/year=2023/month=01/day=15/base.parquet",
+        Option.empty(),
+        "20230115000000000",
+        TABLE_PATH,
+        MAX_MEMORY,
+        "read_optimized",
+        null,
+        "file-partitioned");
+
+    IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
+        Collections.singletonList(partitionedSplit), "20230115000000000");
+
+    HoodieContinuousSplitBatch batch = 
HoodieContinuousSplitBatch.fromResult(result);
+
+    HoodieSourceSplit split = batch.getSplits().iterator().next();
+    assertEquals("year=2023/month=01/day=15", split.getPartitionPath());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index d200935fa0e3..7d7cbecc1a1a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -21,6 +21,9 @@ package org.apache.hudi.source.split;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -1174,5 +1177,147 @@ public class TestHoodieSourceSplitSerializer {
 
     assertEquals("Composition Range is not supported.", 
exception.getMessage());
   }
-}
 
+  // -------------------------------------------------------------------------
+  //  CDC split serialization tests
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testSerializeAndDeserializeCdcSplit() throws IOException {
+    HoodieCDCFileSplit change = new HoodieCDCFileSplit(
+        "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, 
"insert.parquet");
+    HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+        10, "/table/path", 128 * 1024 * 1024L, "file-cdc",
+        new HoodieCDCFileSplit[]{change}, "read_optimized", 
"20230101000000000");
+
+    byte[] serialized = serializer.serialize(original);
+    assertNotNull(serialized);
+
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertTrue(deserialized instanceof HoodieCdcSourceSplit,
+        "CDC split must deserialize as HoodieCdcSourceSplit");
+    HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+    assertEquals(original.getSplitNum(), cdcDeserialized.getSplitNum());
+    assertEquals(original.getTablePath(), cdcDeserialized.getTablePath());
+    assertEquals(original.getFileId(), cdcDeserialized.getFileId());
+    assertEquals(original.getMaxCompactionMemoryInBytes(), 
cdcDeserialized.getMaxCompactionMemoryInBytes());
+    assertEquals(original.getMergeType(), cdcDeserialized.getMergeType());
+    assertEquals(original.getLatestCommit(), 
cdcDeserialized.getLatestCommit());
+    assertEquals(1, cdcDeserialized.getChanges().length);
+    assertEquals("20230101000000000", 
cdcDeserialized.getChanges()[0].getInstant());
+    assertEquals(HoodieCDCInferenceCase.BASE_FILE_INSERT, 
cdcDeserialized.getChanges()[0].getCdcInferCase());
+  }
+
+  @Test
+  public void testSerializeAndDeserializeCdcSplitWithEmptyChanges() throws 
IOException {
+    HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+        11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc",
+        new HoodieCDCFileSplit[0], "payload_combine", "20230201000000000");
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+    HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+    assertEquals(0, cdcDeserialized.getChanges().length);
+    assertEquals("/warehouse/table", cdcDeserialized.getTablePath());
+    assertEquals(256 * 1024 * 1024L, 
cdcDeserialized.getMaxCompactionMemoryInBytes());
+  }
+
+  @Test
+  public void testSerializeAndDeserializeCdcSplitWithMultipleChanges() throws 
IOException {
+    HoodieCDCFileSplit insert = new HoodieCDCFileSplit(
+        "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, 
"insert.parquet");
+    HoodieCDCFileSplit delete = new HoodieCDCFileSplit(
+        "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE, 
"delete.log");
+    HoodieCDCFileSplit logFile = new HoodieCDCFileSplit(
+        "20230103000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
+    HoodieCDCFileSplit replace = new HoodieCDCFileSplit(
+        "20230104000000000", HoodieCDCInferenceCase.REPLACE_COMMIT, 
"replace.parquet");
+
+    HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+        12, "/table", 64 * 1024 * 1024L, "file-multi-cdc",
+        new HoodieCDCFileSplit[]{insert, delete, logFile, replace},
+        "read_optimized", "20230104000000000");
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+    HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+    assertEquals(4, cdcDeserialized.getChanges().length);
+    assertEquals("20230101000000000", 
cdcDeserialized.getChanges()[0].getInstant());
+    assertEquals(HoodieCDCInferenceCase.BASE_FILE_INSERT, 
cdcDeserialized.getChanges()[0].getCdcInferCase());
+    assertEquals("20230102000000000", 
cdcDeserialized.getChanges()[1].getInstant());
+    assertEquals(HoodieCDCInferenceCase.BASE_FILE_DELETE, 
cdcDeserialized.getChanges()[1].getCdcInferCase());
+    assertEquals("20230103000000000", 
cdcDeserialized.getChanges()[2].getInstant());
+    assertEquals(HoodieCDCInferenceCase.LOG_FILE, 
cdcDeserialized.getChanges()[2].getCdcInferCase());
+    assertEquals("20230104000000000", 
cdcDeserialized.getChanges()[3].getInstant());
+    assertEquals(HoodieCDCInferenceCase.REPLACE_COMMIT, 
cdcDeserialized.getChanges()[3].getCdcInferCase());
+  }
+
+  @Test
+  public void testSerializeAndDeserializeCdcSplitPreservesConsumedState() 
throws IOException {
+    HoodieCDCFileSplit change = new HoodieCDCFileSplit(
+        "20230101000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
+    HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+        13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",
+        new HoodieCDCFileSplit[]{change}, "read_optimized", 
"20230101000000000");
+
+    original.updatePosition(5, 200L);
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+    HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+    assertEquals(5, cdcDeserialized.getFileOffset());
+    assertEquals(200L, cdcDeserialized.getConsumed());
+    assertTrue(cdcDeserialized.isConsumed());
+  }
+
+  @Test
+  public void testRoundTripCdcSplitMultipleTimes() throws IOException {
+    HoodieCDCFileSplit change1 = new HoodieCDCFileSplit(
+        "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, 
"insert.parquet");
+    HoodieCDCFileSplit change2 = new HoodieCDCFileSplit(
+        "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE, 
"delete.log");
+
+    HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+        14, "/roundtrip/table", 128 * 1024 * 1024L, "file-roundtrip-cdc",
+        new HoodieCDCFileSplit[]{change1, change2},
+        "read_optimized", "20230102000000000");
+
+    original.updatePosition(3, 50L);
+
+    HoodieSourceSplit current = original;
+    for (int i = 0; i < 5; i++) {
+      byte[] s = serializer.serialize(current);
+      current = serializer.deserialize(serializer.getVersion(), s);
+    }
+
+    assertTrue(current instanceof HoodieCdcSourceSplit);
+    HoodieCdcSourceSplit cdcCurrent = (HoodieCdcSourceSplit) current;
+    assertEquals(original.getSplitNum(), cdcCurrent.getSplitNum());
+    assertEquals(original.getTablePath(), cdcCurrent.getTablePath());
+    assertEquals(original.getFileId(), cdcCurrent.getFileId());
+    assertEquals(original.getMaxCompactionMemoryInBytes(), 
cdcCurrent.getMaxCompactionMemoryInBytes());
+    assertEquals(2, cdcCurrent.getChanges().length);
+    assertEquals(3, cdcCurrent.getFileOffset());
+    assertEquals(50L, cdcCurrent.getConsumed());
+  }
+
+  @Test
+  public void testRegularSplitDeserializesAsHoodieSourceSplitNotCdc() throws 
IOException {
+    HoodieSourceSplit original = new HoodieSourceSplit(
+        15, "base-path", Option.empty(), "/table/path", "/partition",
+        "read_optimized", "19700101000000000", "file-regular", Option.empty());
+
+    byte[] serialized = serializer.serialize(original);
+    HoodieSourceSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+    assertFalse(deserialized instanceof HoodieCdcSourceSplit,
+        "Regular split must not deserialize as HoodieCdcSourceSplit");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f577d4cfaf39..d833d2929429 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -189,8 +189,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
-  void 
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode 
mode) throws Exception {
+  @MethodSource("cdcSupplementalLoggingModeWithSourceV2")
+  void 
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode 
mode, boolean useSourceV2) throws Exception {
     streamTableEnv.getConfig().getConfiguration()
         .setString("table.dynamic-table-options.enabled", "true");
     // create filesystem table named source
@@ -204,6 +204,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.CDC_ENABLED, true)
         .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
@@ -3216,6 +3217,18 @@ public class ITTestHoodieDataSource {
     return Stream.of(data).map(Arguments::of);
   }
 
+  private static Stream<Arguments> cdcSupplementalLoggingModeWithSourceV2() {
+    Object[][] data =
+        new Object[][] {
+            {HoodieCDCSupplementalLoggingMode.DATA_BEFORE, false},
+            {HoodieCDCSupplementalLoggingMode.DATA_BEFORE, true},
+            {HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, false},
+            {HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, true},
+            {HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY, false},
+            {HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY, true}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   /**
    * Return test params => (HoodieTableType, true/false).
    */

Reply via email to