This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f15e1d060f96 feat: add Flink source reader function for cdc splits
(#18361)
f15e1d060f96 is described below
commit f15e1d060f96b7d825803d3d4fa2b1e0aa9f46a3
Author: Peter Huang <[email protected]>
AuthorDate: Thu Mar 26 19:23:06 2026 -0700
feat: add Flink source reader function for cdc splits (#18361)
---
.../function/HoodieCdcSplitReaderFunction.java | 1065 ++++++++++++++++++++
.../hudi/source/split/HoodieCdcSourceSplit.java | 58 ++
.../source/split/HoodieContinuousSplitBatch.java | 39 +-
.../source/split/HoodieSourceSplitSerializer.java | 43 +
.../org/apache/hudi/table/HoodieTableSource.java | 42 +-
.../table/format/mor/MergeOnReadInputFormat.java | 2 +-
.../table/format/mor/MergeOnReadTableState.java | 6 +-
.../apache/hudi/source/TestStreamReadOperator.java | 2 +-
.../function/TestHoodieCdcSplitReaderFunction.java | 195 ++++
.../source/split/TestHoodieCdcSourceSplit.java | 182 ++++
.../split/TestHoodieContinuousSplitBatch.java | 362 +++++++
.../split/TestHoodieSourceSplitSerializer.java | 147 ++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 17 +-
13 files changed, 2131 insertions(+), 29 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
new file mode 100644
index 000000000000..dee8a95c124a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.source.ExpressionPredicates;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.RowDataProjection;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * CDC reader function for source V2. Reads CDC splits ({@link
HoodieCdcSourceSplit}) and
+ * emits change-log {@link RowData} records tagged with the appropriate {@link
RowKind}.
+ *
+ * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted
for the
+ * {@link SplitReaderFunction} contract.
+ */
+@Slf4j
+public class HoodieCdcSplitReaderFunction implements
SplitReaderFunction<RowData> {
+
+ private final org.apache.flink.configuration.Configuration conf;
+ private final InternalSchemaManager internalSchemaManager;
+ private final List<DataType> fieldTypes;
+ private final MergeOnReadTableState tableState;
+ private final boolean emitDelete;
+ private final List<ExpressionPredicates.Predicate> predicates;
+ private transient HoodieTableMetaClient metaClient;
+ private transient HoodieWriteConfig writeConfig;
+ private transient org.apache.hadoop.conf.Configuration hadoopConf;
+ private transient ClosableIterator<RowData> currentIterator;
+ // Fallback reader for non-CDC splits (e.g. snapshot reads when
read.start-commit='earliest')
+ private transient HoodieSplitReaderFunction fallbackReaderFunction;
+
+ /**
+ * Creates a CDC split reader function.
+ *
+ * @param conf Flink configuration
+ * @param tableState Merge on Read table state
+ * @param internalSchemaManager Schema-evolution manager
+ * @param fieldTypes DataType list for all table fields (used for
parquet reading)
+ * @param predicates Predicates for push down
+ * @param emitDelete Whether to emit delete
+ */
+ public HoodieCdcSplitReaderFunction(
+ org.apache.flink.configuration.Configuration conf,
+ MergeOnReadTableState tableState,
+ InternalSchemaManager internalSchemaManager,
+ List<DataType> fieldTypes,
+ List<ExpressionPredicates.Predicate> predicates,
+ boolean emitDelete) {
+ this.conf = conf;
+ this.tableState = tableState;
+ this.internalSchemaManager = internalSchemaManager;
+ this.fieldTypes = fieldTypes;
+ this.predicates = predicates;
+ this.emitDelete = emitDelete;
+ }
+
+ @Override
+ public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>
read(HoodieSourceSplit split) {
+ if (!(split instanceof HoodieCdcSourceSplit)) {
+ // Non-CDC splits arrive when reading from 'earliest' with no prior CDC
history
+ // (i.e. instantRange is empty → snapshot path). Fall back to the
standard MOR reader
+ // which emits all records as INSERT rows, matching the expected
snapshot behaviour.
+ return getFallbackReaderFunction().read(split);
+ }
+ HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+
+ HoodieCDCSupplementalLoggingMode mode =
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+ HoodieTableMetaClient client = getMetaClient();
+ HoodieWriteConfig wConfig = getWriteConfig();
+
+ ImageManager imageManager = new ImageManager(tableState.getRowType(),
wConfig, this::getFileSliceIterator);
+
+ Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc
=
+ cdcFileSplit -> createRecordIteratorSafe(
+ cdcSplit.getTablePath(),
+ cdcSplit.getMaxCompactionMemoryInBytes(),
+ cdcFileSplit,
+ mode,
+ imageManager,
+ client);
+
+ currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(),
imageManager, recordIteratorFunc);
+ BatchRecords<RowData> records = BatchRecords.forRecords(
+ split.splitId(), currentIterator, split.getFileOffset(),
split.getConsumed());
+ records.seek(split.getConsumed());
+ return records;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (currentIterator != null) {
+ currentIterator.close();
+ }
+ if (fallbackReaderFunction != null) {
+ fallbackReaderFunction.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Internal helpers
+ // -------------------------------------------------------------------------
+
+ private HoodieSplitReaderFunction getFallbackReaderFunction() {
+ if (fallbackReaderFunction == null) {
+ fallbackReaderFunction = new HoodieSplitReaderFunction(
+ conf,
+ HoodieSchema.parse(tableState.getTableSchema()),
+ HoodieSchema.parse(tableState.getRequiredSchema()),
+ internalSchemaManager,
+ conf.get(FlinkOptions.MERGE_TYPE),
+ predicates,
+ emitDelete);
+ }
+ return fallbackReaderFunction;
+ }
+
+ private ClosableIterator<RowData> createRecordIteratorSafe(
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ HoodieCDCSupplementalLoggingMode mode,
+ ImageManager imageManager,
+ HoodieTableMetaClient client) {
+ try {
+ return createRecordIterator(tablePath, maxCompactionMemoryInBytes,
fileSplit, mode, imageManager, client);
+ } catch (IOException e) {
+ throw new HoodieException("Failed to create CDC record iterator for
split: " + fileSplit, e);
+ }
+ }
+
+ private ClosableIterator<RowData> createRecordIterator(
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ HoodieCDCSupplementalLoggingMode mode,
+ ImageManager imageManager,
+ HoodieTableMetaClient client) throws IOException {
+
+ final HoodieSchema tableSchema =
HoodieSchema.parse(tableState.getTableSchema());
+ final HoodieSchema requiredSchema =
HoodieSchema.parse(tableState.getRequiredSchema());
+ switch (fileSplit.getCdcInferCase()) {
+ case BASE_FILE_INSERT: {
+ ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
+ "CDC file path should exist and be singleton for
BASE_FILE_INSERT");
+ String path = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
+ return new AddBaseFileIterator(getBaseFileIterator(path));
+ }
+ case BASE_FILE_DELETE: {
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice should exist for BASE_FILE_DELETE");
+ FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
+ MergeOnReadInputSplit inputSplit =
CdcInputFormat.fileSlice2Split(tablePath, fileSlice,
maxCompactionMemoryInBytes);
+ return new RemoveBaseFileIterator(tableState.getRequiredRowType(),
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+ }
+ case AS_IS: {
+ HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+ HoodieSchema cdcSchema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+ switch (mode) {
+ case DATA_BEFORE_AFTER:
+ return new BeforeAfterImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(), cdcSchema, fileSplit);
+ case DATA_BEFORE:
+ return new BeforeImageIterator(
+ conf, getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(),
+ maxCompactionMemoryInBytes, cdcSchema, fileSplit,
imageManager);
+ case OP_KEY_ONLY:
+ return new RecordKeyImageIterator(
+ conf, getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(),
+ maxCompactionMemoryInBytes, cdcSchema, fileSplit,
imageManager);
+ default:
+ throw new AssertionError("Unexpected CDC supplemental logging
mode: " + mode);
+ }
+ }
+ case LOG_FILE: {
+ ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
+ "CDC file path should exist and be singleton for LOG_FILE");
+ String logFilePath = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
+ MergeOnReadInputSplit split =
CdcInputFormat.singleLogFile2Split(tablePath, logFilePath,
maxCompactionMemoryInBytes);
+ ClosableIterator<HoodieRecord<RowData>> recordIterator =
getFileSliceHoodieRecordIterator(split);
+ return new DataLogFileIterator(
+ maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema,
tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ recordIterator, client, getWriteConfig());
+ }
+ case REPLACE_COMMIT: {
+ return new ReplaceCommitIterator(
+ conf, tablePath, tableState.getRequiredRowType(),
tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
+ fileSplit, this::getFileSliceIterator);
+ }
+ default:
+ throw new AssertionError("Unexpected CDC file split infer case: " +
fileSplit.getCdcInferCase());
+ }
+ }
+
+ /** Reads the full-schema before/after image for a file slice
(emitDelete=false). */
+ private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit
split) {
+ FileSlice fileSlice = buildFileSlice(split);
+ final HoodieSchema tableSchema =
HoodieSchemaCache.intern(HoodieSchema.parse(tableState.getTableSchema()));
+ try {
+ HoodieFileGroupReader<RowData> reader =
FormatUtils.createFileGroupReader(
+ getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+ tableSchema, tableSchema, split.getLatestCommit(),
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, false,
+ predicates, split.getInstantRange());
+ return reader.getClosableIterator();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to create file slice iterator for
split: " + split, e);
+ }
+ }
+
+ /** Reads a single log file and returns a typed {@link HoodieRecord}
iterator (for LOG_FILE CDC inference). */
+ private ClosableIterator<HoodieRecord<RowData>>
getFileSliceHoodieRecordIterator(MergeOnReadInputSplit split) {
+ FileSlice fileSlice = buildFileSlice(split);
+ final HoodieSchema tableSchema =
HoodieSchemaCache.intern(HoodieSchema.parse(tableState.getTableSchema()));
+ try {
+ HoodieFileGroupReader<RowData> reader =
FormatUtils.createFileGroupReader(
+ getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+ tableSchema, tableSchema, split.getLatestCommit(),
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, true,
+ predicates, split.getInstantRange());
+ return reader.getClosableHoodieRecordIterator();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to create Hoodie record iterator for
split: " + split, e);
+ }
+ }
+
+ /** Reads a parquet CDC base file returning required-schema records. */
+ private ClosableIterator<RowData> getBaseFileIterator(String path) throws
IOException {
+ String[] fieldNames = tableState.getRowType().getFieldNames().toArray(new
String[0]);
+ DataType[] fieldTypesArray = fieldTypes.toArray(new DataType[0]);
+ LinkedHashMap<String, Object> partObjects =
FilePathUtils.generatePartitionSpecs(
+ path,
+ tableState.getRowType().getFieldNames(),
+ fieldTypes,
+ conf.get(FlinkOptions.PARTITION_DEFAULT_NAME),
+ conf.get(FlinkOptions.PARTITION_PATH_FIELD),
+ conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)
+ );
+
+ return RecordIterators.getParquetRecordIterator(
+ internalSchemaManager,
+ conf.get(FlinkOptions.READ_UTC_TIMEZONE),
+ true,
+ HadoopConfigurations.getParquetConf(conf, getHadoopConf()),
+ fieldNames,
+ fieldTypesArray,
+ partObjects,
+ tableState.getRequiredPositions(),
+ 2048,
+ new org.apache.flink.core.fs.Path(path),
+ 0,
+ Long.MAX_VALUE,
+ predicates);
+ }
+
+ private static FileSlice buildFileSlice(MergeOnReadInputSplit split) {
+ return new FileSlice(
+ new HoodieFileGroupId("", split.getFileId()),
+ "",
+ split.getBasePath().map(HoodieBaseFile::new).orElse(null),
+ split.getLogPaths()
+ .map(lp ->
lp.stream().map(HoodieLogFile::new).collect(Collectors.toList()))
+ .orElse(Collections.emptyList()));
+ }
+
+ private static int[] computeRequiredPositions(RowType rowType, RowType
requiredRowType) {
+ List<String> allNames = rowType.getFieldNames();
+ return requiredRowType.getFieldNames().stream()
+ .map(allNames::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ private HoodieTableMetaClient getMetaClient() {
+ if (metaClient == null) {
+ metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
+ }
+ return metaClient;
+ }
+
+ private HoodieWriteConfig getWriteConfig() {
+ if (writeConfig == null) {
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+ }
+ return writeConfig;
+ }
+
+ private org.apache.hadoop.conf.Configuration getHadoopConf() {
+ if (hadoopConf == null) {
+ hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+ }
+ return hadoopConf;
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner iterators (adapted from CdcInputFormat inner classes)
+ // -------------------------------------------------------------------------
+
+ /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating
record reading to a factory. */
+ private static class CdcFileSplitsIterator implements
ClosableIterator<RowData> {
+ private ImageManager imageManager;
+ private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+ private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc;
+ private ClosableIterator<RowData> recordIterator;
+
+ CdcFileSplitsIterator(
+ HoodieCDCFileSplit[] changes,
+ ImageManager imageManager,
+ Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc) {
+ this.fileSplitIterator = Arrays.asList(changes).iterator();
+ this.imageManager = imageManager;
+ this.recordIteratorFunc = recordIteratorFunc;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (recordIterator != null) {
+ if (recordIterator.hasNext()) {
+ return true;
+ } else {
+ recordIterator.close();
+ recordIterator = null;
+ }
+ }
+ if (fileSplitIterator.hasNext()) {
+ recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+ return recordIterator.hasNext();
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return recordIterator.next();
+ }
+
+ @Override
+ public void close() {
+ if (recordIterator != null) {
+ recordIterator.close();
+ }
+ if (imageManager != null) {
+ imageManager.close();
+ imageManager = null;
+ }
+ }
+ }
+
+ /** Wraps a base-file parquet iterator and marks every record as {@link
RowKind#INSERT}. */
+ private static class AddBaseFileIterator implements
ClosableIterator<RowData> {
+ private ClosableIterator<RowData> nested;
+ private RowData currentRecord;
+
+ AddBaseFileIterator(ClosableIterator<RowData> nested) {
+ this.nested = nested;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nested.hasNext()) {
+ currentRecord = nested.next();
+ currentRecord.setRowKind(RowKind.INSERT);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ /** Wraps a file-slice iterator and marks every record as {@link
RowKind#DELETE}, with projection. */
+ private static class RemoveBaseFileIterator implements
ClosableIterator<RowData> {
+ private ClosableIterator<RowData> nested;
+ private final RowDataProjection projection;
+
+ RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions,
ClosableIterator<RowData> iterator) {
+ this.nested = iterator;
+ this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nested.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = nested.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ /**
+ * Handles the {@code LOG_FILE} CDC inference case: compares records from
the log file
+ * against before-image snapshots to emit
INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
+ */
+ private static class DataLogFileIterator implements
ClosableIterator<RowData> {
+ private final HoodieSchema tableSchema;
+ private final long maxCompactionMemoryInBytes;
+ private final ImageManager imageManager;
+ private final RowDataProjection projection;
+ private final BufferedRecordMerger recordMerger;
+ private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
+ private final DeleteContext deleteContext;
+ private final HoodieReaderContext<RowData> readerContext;
+ private final String[] orderingFields;
+ private final TypedProperties props;
+
+ private ExternalSpillableMap<String, byte[]> beforeImages;
+ private RowData currentImage;
+ private RowData sideImage;
+
+ DataLogFileIterator(
+ long maxCompactionMemoryInBytes,
+ ImageManager imageManager,
+ HoodieCDCFileSplit cdcFileSplit,
+ HoodieSchema tableSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
+ HoodieTableMetaClient metaClient,
+ HoodieWriteConfig writeConfig) throws IOException {
+ this.tableSchema = tableSchema;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.imageManager = imageManager;
+ this.projection =
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
+ ? null : RowDataProjection.instance(requiredRowType,
requiredPositions);
+ this.props = writeConfig.getProps();
+ this.readerContext = new
FlinkReaderContextFactory(metaClient).getContext();
+ readerContext.initRecordMerger(props);
+ this.orderingFields = ConfigUtils.getOrderingFields(props);
+ this.recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ readerContext.getMergeMode(),
+ false,
+ Option.of(writeConfig.getRecordMerger()),
+ tableSchema,
+
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(),
writeConfig.getPayloadClass())),
+ props,
+ metaClient.getTableConfig().getPartialUpdateMode());
+ this.logRecordIterator = logRecordIterator;
+ this.deleteContext = new DeleteContext(props,
tableSchema).withReaderSchema(tableSchema);
+ initImages(cdcFileSplit, writeConfig);
+ }
+
+ private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig
writeConfig) throws IOException {
+ if (fileSplit.getBeforeFileSlice().isPresent() &&
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+ this.beforeImages = this.imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ } else {
+ this.beforeImages = FormatUtils.spillableMap(writeConfig,
maxCompactionMemoryInBytes, getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ }
+ while (logRecordIterator.hasNext()) {
+ HoodieRecord<RowData> record = logRecordIterator.next();
+ RowData existed =
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+ if (isDelete(record)) {
+ if (existed != null) {
+ existed.setRowKind(RowKind.DELETE);
+ currentImage = existed;
+ return true;
+ }
+ } else {
+ if (existed == null) {
+ RowData newRow = record.getData();
+ newRow.setRowKind(RowKind.INSERT);
+ currentImage = newRow;
+ return true;
+ } else {
+ HoodieOperation operation =
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
+ HoodieRecord<RowData> historyRecord = new
HoodieFlinkRecord(record.getKey(), operation, existed);
+ HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord,
record).get();
+ if (merged.getData() != existed) {
+ existed.setRowKind(RowKind.UPDATE_BEFORE);
+ currentImage = existed;
+ RowData mergedRow = merged.getData();
+ mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+ imageManager.updateImageRecord(record.getRecordKey(),
beforeImages, mergedRow);
+ sideImage = mergedRow;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return projection != null ? projection.project(currentImage) :
currentImage;
+ }
+
+ @Override
+ public void close() {
+ logRecordIterator.close();
+ imageManager.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Option<HoodieRecord<RowData>> mergeRowWithLog(
+ HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
+ try {
+ BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
+ historyRecord, tableSchema, readerContext.getRecordContext(),
props, orderingFields, deleteContext);
+ BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
+ newRecord, tableSchema, readerContext.getRecordContext(), props,
orderingFields, deleteContext);
+ BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf,
newBuf);
+ return Option.ofNullable(readerContext.getRecordContext()
+ .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
+ } catch (IOException e) {
+ throw new HoodieIOException("Merge base and delta payloads exception",
e);
+ }
+ }
+
+ private boolean isDelete(HoodieRecord<RowData> record) {
+ return record.isDelete(deleteContext, CollectionUtils.emptyProps());
+ }
+ }
+
+ /**
+ * Base iterator for CDC log files stored with supplemental logging (AS_IS
inference case).
+ * Reads {@link HoodieCDCLogRecordIterator} and resolves before/after images
using
+ * subclass-specific logic.
+ */
+ private abstract static class BaseImageIterator implements
ClosableIterator<RowData> {
+ private final HoodieSchema requiredSchema;
+ private final int[] requiredPos;
+ private final GenericRecordBuilder recordBuilder;
+ private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+ private HoodieCDCLogRecordIterator cdcItr;
+
+ private GenericRecord cdcRecord;
+ private RowData sideImage;
+ private RowData currentImage;
+
+ BaseImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ this.requiredSchema = requiredSchema;
+ this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
+ this.recordBuilder = new
GenericRecordBuilder(requiredSchema.getAvroSchema());
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
+
+ StoragePath hadoopTablePath = new StoragePath(tablePath);
+ HoodieStorage storage = HoodieStorageUtils.getStorage(
+ tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
+ HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
+ .map(cdcFile -> {
+ try {
+ return new HoodieLogFile(storage.getPathInfo(new
StoragePath(hadoopTablePath, cdcFile)));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get file status for CDC
log: " + cdcFile, e);
+ }
+ })
+ .toArray(HoodieLogFile[]::new);
+ this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema);
+ }
+
+ private static int[] computeRequiredPos(HoodieSchema tableSchema,
HoodieSchema requiredSchema) {
+ HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+ List<String> fields = dataSchema.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList());
+ return requiredSchema.getFields().stream()
+ .map(f -> fields.indexOf(f.name()))
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ } else if (cdcItr.hasNext()) {
+ cdcRecord = (GenericRecord) cdcItr.next();
+ String op = String.valueOf(cdcRecord.get(0));
+ resolveImage(op);
+ return true;
+ }
+ return false;
+ }
+
+ protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ @Override
+ public RowData next() {
+ return currentImage;
+ }
+
+ @Override
+ public void close() {
+ if (cdcItr != null) {
+ cdcItr.close();
+ cdcItr = null;
+ }
+ }
+
+ private void resolveImage(String op) {
+ switch (op) {
+ case "i":
+ currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+ break;
+ case "u":
+ currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+ sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+ break;
+ case "d":
+ currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+ break;
+ default:
+ throw new AssertionError("Unexpected CDC operation: " + op);
+ }
+ }
+
+ protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ avroRecord, requiredSchema, requiredPos, recordBuilder);
+ RowData resolved = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
+ resolved.setRowKind(rowKind);
+ return resolved;
+ }
+ }
+
+ /** Reads CDC log files that contain both before and after images ({@code
DATA_BEFORE_AFTER} mode). */
+ private static class BeforeAfterImageIterator extends BaseImageIterator {
+ BeforeAfterImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ /**
+ * Reads CDC log files containing op + key + before_image ({@code
DATA_BEFORE} mode).
+ * The after-image is loaded from the after file-slice via the {@link
ImageManager}.
+ */
+ private static class BeforeImageIterator extends BaseImageIterator {
+ protected ExternalSpillableMap<String, byte[]> afterImages;
+ protected final long maxCompactionMemoryInBytes;
+ protected final RowDataProjection projection;
+ protected final ImageManager imageManager;
+
+ BeforeImageIterator(
+ org.apache.flink.configuration.Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ ImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.projection = RowDataProjection.instance(requiredRowType,
+ computePositions(tableSchema, requiredRowType));
+ this.imageManager = imageManager;
+ initImages(fileSplit);
+ }
+
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
+ ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+ "Current file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.afterImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, afterImages,
rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+
+ private static int[] computePositions(HoodieSchema tableSchema, RowType
requiredRowType) {
+ List<String> allFields = tableSchema.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList());
+ return requiredRowType.getFieldNames().stream()
+ .map(allFields::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+ }
+ }
+
+ /**
+ * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
+ * Both before and after images are loaded from file-slice snapshots via
{@link ImageManager}.
+ */
+ private static class RecordKeyImageIterator extends BeforeImageIterator {
+ protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+ RecordKeyImageIterator(
+ org.apache.flink.configuration.Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ ImageManager imageManager) throws IOException {
+ super(flinkConf, hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType,
+ maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
+ }
+
+ @Override
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
+ super.initImages(fileSplit);
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.beforeImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, beforeImages,
rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+ }
+
+ /** Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records
from before-slice as DELETE. */
+ private static class ReplaceCommitIterator implements
ClosableIterator<RowData> {
+ private final ClosableIterator<RowData> itr;
+ private final RowDataProjection projection;
+
+ ReplaceCommitIterator(
+ org.apache.flink.configuration.Configuration flinkConf,
+ String tablePath,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
+ this.itr = splitIteratorFunc.apply(inputSplit);
+ this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = itr.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ itr.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // ImageManager - caches full-schema row images keyed by record key
+ // -------------------------------------------------------------------------
+
+ /**
+ * Manages serialized before/after image snapshots for a file group, cached
by instant time.
+ * At most two versions (before and after) are kept in memory; older entries
are spilled to disk.
+ */
+ private static class ImageManager implements AutoCloseable {
+ private final HoodieWriteConfig writeConfig;
+ private final RowDataSerializer serializer;
+ private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc;
+ private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+ ImageManager(
+ RowType rowType,
+ HoodieWriteConfig writeConfig,
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
+ this.serializer = new RowDataSerializer(rowType);
+ this.writeConfig = writeConfig;
+ this.splitIteratorFunc = splitIteratorFunc;
+ this.cache = new TreeMap<>();
+ }
+
+ ExternalSpillableMap<String, byte[]> getOrLoadImages(
+ long maxCompactionMemoryInBytes, FileSlice fileSlice) throws
IOException {
+ final String instant = fileSlice.getBaseInstantTime();
+ if (cache.containsKey(instant)) {
+ return cache.get(instant);
+ }
+ if (cache.size() > 1) {
+ String oldest = cache.keySet().iterator().next();
+ cache.remove(oldest).close();
+ }
+ ExternalSpillableMap<String, byte[]> images =
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+ cache.put(instant, images);
+ return images;
+ }
+
+ private ExternalSpillableMap<String, byte[]> loadImageRecords(
+ long maxCompactionMemoryInBytes, FileSlice fileSlice) throws
IOException {
+ MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+ writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
+ ExternalSpillableMap<String, byte[]> imageRecordsMap =
+ FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes,
getClass().getSimpleName());
+ try (ClosableIterator<RowData> itr =
splitIteratorFunc.apply(inputSplit)) {
+ while (itr.hasNext()) {
+ RowData row = itr.next();
+ String recordKey =
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ imageRecordsMap.put(recordKey, baos.toByteArray());
+ }
+ }
+ return imageRecordsMap;
+ }
+
+ RowData getImageRecord(
+ String recordKey, ExternalSpillableMap<String, byte[]> imageCache,
RowKind rowKind) {
+ byte[] bytes = imageCache.get(recordKey);
+ ValidationUtils.checkState(bytes != null,
+ "Key " + recordKey + " does not exist in current file group image");
+ try {
+ RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+ row.setRowKind(rowKind);
+ return row;
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key:
" + recordKey, e);
+ }
+ }
+
+ void updateImageRecord(
+ String recordKey, ExternalSpillableMap<String, byte[]> imageCache,
RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ try {
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to serialize image record for key: "
+ recordKey, e);
+ }
+ imageCache.put(recordKey, baos.toByteArray());
+ }
+
+ RowData removeImageRecord(
+ String recordKey, ExternalSpillableMap<String, byte[]> imageCache) {
+ byte[] bytes = imageCache.remove(recordKey);
+ if (bytes == null) {
+ return null;
+ }
+ try {
+ return serializer.deserialize(new BytesArrayInputView(bytes));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key:
" + recordKey, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ cache.values().forEach(ExternalSpillableMap::close);
+ cache.clear();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // I/O view adapters for RowDataSerializer
+ // -------------------------------------------------------------------------
+
+ private static final class BytesArrayInputView extends DataInputStream
+ implements org.apache.flink.core.memory.DataInputView {
+ BytesArrayInputView(byte[] data) {
+ super(new ByteArrayInputStream(data));
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+ }
+
+ private static final class BytesArrayOutputView extends DataOutputStream
+ implements org.apache.flink.core.memory.DataOutputView {
+ BytesArrayOutputView(ByteArrayOutputStream baos) {
+ super(baos);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ write(0);
+ }
+ }
+
+ @Override
+ public void write(org.apache.flink.core.memory.DataInputView source, int
numBytes) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ source.readFully(buffer);
+ write(buffer);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
new file mode 100644
index 000000000000..356e2f7a4ebe
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import lombok.ToString;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.util.Option;
+
+import lombok.Getter;
+
+/**
+ * A {@link HoodieSourceSplit} for CDC (Change Data Capture) reads.
+ *
+ * <p>Extends the base split with an ordered list of {@link
HoodieCDCFileSplit} entries
+ * that describe the CDC changes for a single file group, and the memory
budget used
+ * for spillable image maps.
+ */
+@ToString
+public class HoodieCdcSourceSplit extends HoodieSourceSplit {
+ private static final long serialVersionUID = 1L;
+
+ /** Ordered CDC file splits for one file group, sorted by instant time. */
+ @Getter
+ private final HoodieCDCFileSplit[] changes;
+
+ /** Maximum memory in bytes available for compaction/merge operations. */
+ @Getter
+ private final long maxCompactionMemoryInBytes;
+
+ public HoodieCdcSourceSplit(
+ int splitNum,
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ String fileId,
+ HoodieCDCFileSplit[] changes,
+ String mergeType,
+ String lastCommit) {
+ super(splitNum, null, Option.empty(), tablePath, "", mergeType,
lastCommit, fileId, Option.empty());
+ this.changes = changes;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 0b07b2e84bfb..a33ba88bcb05 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -19,9 +19,11 @@
package org.apache.hudi.source.split;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import lombok.Getter;
@@ -61,17 +63,34 @@ public class HoodieContinuousSplitBatch {
}
public static HoodieContinuousSplitBatch
fromResult(IncrementalInputSplits.Result result) {
- List<HoodieSourceSplit> splits =
result.getInputSplits().stream().map(split ->
- new HoodieSourceSplit(
+ List<HoodieSourceSplit> splits =
result.getInputSplits().stream().map(split -> {
+ if (split instanceof CdcInputSplit) {
+ CdcInputSplit cdcSplit = (CdcInputSplit) split;
+ HoodieCDCFileSplit[] changes = cdcSplit.getChanges();
+ // CdcInputSplit does not carry a latestCommit; derive it from the
last (largest instant)
+ // CDC file split, falling back to the batch end instant when the
array is empty.
+ String latestCommit = changes.length > 0
+ ? changes[changes.length - 1].getInstant()
+ : result.getEndInstant();
+ return (HoodieSourceSplit) new HoodieCdcSourceSplit(
HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
- split.getBasePath().orElse(null),
- split.getLogPaths(), split.getTablePath(),
- resolvePartitionPath(split), split.getMergeType(),
- split.getLatestCommit(),
- split.getFileId(),
- split.getInstantRange()
- )
- ).collect(Collectors.toList());
+ cdcSplit.getTablePath(),
+ cdcSplit.getMaxCompactionMemoryInBytes(),
+ cdcSplit.getFileId(),
+ changes,
+ split.getMergeType(),
+ latestCommit);
+ }
+ return new HoodieSourceSplit(
+ HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
+ split.getBasePath().orElse(null),
+ split.getLogPaths(), split.getTablePath(),
+ resolvePartitionPath(split), split.getMergeType(),
+ split.getLatestCommit(),
+ split.getFileId(),
+ split.getInstantRange()
+ );
+ }).collect(Collectors.toList());
return new HoodieContinuousSplitBatch(splits, result.getEndInstant(),
result.getOffset());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 98980e5697db..62b96a846627 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
@@ -29,6 +30,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -118,6 +121,25 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
}
}
+ // Serialize CDC-specific fields if this is a HoodieCdcSourceSplit
+ boolean isCdcSplit = obj instanceof HoodieCdcSourceSplit;
+ out.writeBoolean(isCdcSplit);
+ if (isCdcSplit) {
+ HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) obj;
+ out.writeLong(cdcSplit.getMaxCompactionMemoryInBytes());
+ HoodieCDCFileSplit[] changes = cdcSplit.getChanges();
+ out.writeInt(changes.length);
+ for (HoodieCDCFileSplit change : changes) {
+ ByteArrayOutputStream changeBaos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(changeBaos)) {
+ oos.writeObject(change);
+ }
+ byte[] changeBytes = changeBaos.toByteArray();
+ out.writeInt(changeBytes.length);
+ out.write(changeBytes);
+ }
+ }
+
out.flush();
return baos.toByteArray();
}
@@ -204,6 +226,27 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
}
// Create HoodieSourceSplit object
+ boolean isCdcSplit = in.readBoolean();
+ if (isCdcSplit) {
+ long maxCompactionMemoryInBytes = in.readLong();
+ int changesCount = in.readInt();
+ HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[changesCount];
+ for (int i = 0; i < changesCount; i++) {
+ int changeLen = in.readInt();
+ byte[] changeBytes = new byte[changeLen];
+ in.readFully(changeBytes);
+ try (ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(changeBytes))) {
+ changes[i] = (HoodieCDCFileSplit) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to deserialize HoodieCDCFileSplit",
e);
+ }
+ }
+ HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
+ splitNum, tablePath, maxCompactionMemoryInBytes, fileId, changes,
mergeType, latestCommit);
+ cdcSplit.updatePosition(fileOffset, consumed);
+ return cdcSplit;
+ }
+
HoodieSourceSplit split = new HoodieSourceSplit(
splitNum,
basePath,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 62852bce8875..5a406da5fbba 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -53,7 +53,10 @@ import org.apache.hudi.source.prune.PartitionBucketIdFunc;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import org.apache.hudi.source.reader.HoodieRecordEmitter;
+import org.apache.hudi.source.reader.function.HoodieCdcSplitReaderFunction;
import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.hudi.source.split.HoodieSourceSplitComparator;
import
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
import
org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
@@ -300,16 +303,33 @@ public class HoodieTableSource extends FileIndexReader
implements
HoodieScanContext context = createHoodieScanContext(rowType);
final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
+ final SplitReaderFunction<RowData> splitReaderFunction;
+ final MergeOnReadTableState<HoodieSourceSplit> hoodieTableState = new
MergeOnReadTableState(
+ rowType,
+ requiredRowType,
+ tableSchema.toString(),
+ HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
+ new ArrayList());
boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
- HoodieSplitReaderFunction splitReaderFunction = new
HoodieSplitReaderFunction(
- conf,
- tableSchema,
- HoodieSchemaConverter.convertToSchema(requiredRowType),
- internalSchemaManager,
- conf.get(FlinkOptions.MERGE_TYPE),
- predicates,
- emitDelete
- );
+ if (conf.get(FlinkOptions.CDC_ENABLED)) {
+ List<DataType> fieldTypes = rowDataType.getChildren();
+ splitReaderFunction = new HoodieCdcSplitReaderFunction(
+ conf,
+ hoodieTableState,
+ internalSchemaManager,
+ fieldTypes,
+ predicates,
+ emitDelete);
+ } else {
+ splitReaderFunction = new HoodieSplitReaderFunction(
+ conf,
+ tableSchema,
+ HoodieSchemaConverter.convertToSchema(requiredRowType),
+ internalSchemaManager,
+ conf.get(FlinkOptions.MERGE_TYPE),
+ predicates,
+ emitDelete);
+ }
return new HoodieSource<>(context, splitReaderFunction, new
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
}
@@ -585,7 +605,7 @@ public class HoodieTableSource extends FileIndexReader
implements
HoodieSchema tableSchema,
DataType rowDataType,
List<MergeOnReadInputSplit> inputSplits) {
- final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+ final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new
MergeOnReadTableState(
rowType,
requiredRowType,
tableSchema.toString(),
@@ -610,7 +630,7 @@ public class HoodieTableSource extends FileIndexReader
implements
DataType rowDataType,
List<MergeOnReadInputSplit> inputSplits,
boolean emitDelete) {
- final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+ final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new
MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index f53998c078a7..dcf2160a26e5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -71,7 +71,7 @@ public class MergeOnReadInputFormat
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
- protected final MergeOnReadTableState tableState;
+ protected final MergeOnReadTableState<MergeOnReadInputSplit> tableState;
/**
* Uniform iterator view for the underneath records.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 868a7f814aba..bc7afa7efad1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -30,7 +30,7 @@ import java.util.List;
* Statistics for merge on read table source.
*/
@Getter
-public class MergeOnReadTableState implements Serializable {
+public class MergeOnReadTableState<T> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -38,7 +38,7 @@ public class MergeOnReadTableState implements Serializable {
private final RowType requiredRowType;
private final String tableSchema;
private final String requiredSchema;
- private final List<MergeOnReadInputSplit> inputSplits;
+ private final List<T> inputSplits;
private final int operationPos;
public MergeOnReadTableState(
@@ -46,7 +46,7 @@ public class MergeOnReadTableState implements Serializable {
RowType requiredRowType,
String tableSchema,
String requiredSchema,
- List<MergeOnReadInputSplit> inputSplits) {
+ List<T> inputSplits) {
this.rowType = rowType;
this.requiredRowType = requiredRowType;
this.tableSchema = tableSchema;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index e540cd55b006..64f48374c1d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -255,7 +255,7 @@ public class TestStreamReadOperator {
}
final DataType rowDataType =
HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
- final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+ final MergeOnReadTableState<MergeOnReadInputSplit> hoodieTableState = new
MergeOnReadTableState(
rowType,
TestConfigurations.ROW_TYPE,
tableSchema.toString(),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
new file mode 100644
index 000000000000..d29a637664ae
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE;
+import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE;
+import static org.apache.hudi.utils.TestConfigurations.TABLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test cases for {@link HoodieCdcSplitReaderFunction}.
+ */
+public class TestHoodieCdcSplitReaderFunction {
+
+ @TempDir
+ File tempDir;
+
+ private Configuration conf;
+ private HoodieSchema tableSchema;
+ private HoodieSchema requiredSchema;
+ private InternalSchemaManager internalSchemaManager;
+ private MergeOnReadTableState tableState;
+
+ @BeforeEach
+ public void setUp() {
+ conf = TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+ tableSchema = mock(HoodieSchema.class);
+ requiredSchema = mock(HoodieSchema.class);
+ internalSchemaManager = mock(InternalSchemaManager.class);
+ tableState = new MergeOnReadTableState(ROW_TYPE, ROW_TYPE,
TABLE_SCHEMA.toString(), TABLE_SCHEMA.toString(), new ArrayList<>());
+ }
+
+ private HoodieCdcSplitReaderFunction createFunction() {
+ return new HoodieCdcSplitReaderFunction(
+ conf,
+ tableState,
+ internalSchemaManager,
+ ROW_DATA_TYPE.getChildren(),
+ Collections.emptyList(),
+ false);
+ }
+
+ // -------------------------------------------------------------------------
+ // Constructor tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testConstructorWithValidParameters() {
+ HoodieCdcSplitReaderFunction function = createFunction();
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithProjectedRequiredRowType() {
+ // requiredRowType is a subset of rowType (first 3 of 5 fields)
+ RowType projectedRowType = new RowType(
+ ROW_TYPE.getFields().subList(0, 3).stream()
+ .map(f -> new RowType.RowField(f.getName(), f.getType()))
+ .collect(java.util.stream.Collectors.toList()));
+
+ tableState = new MergeOnReadTableState(ROW_TYPE, projectedRowType,
TABLE_SCHEMA.toString(),
HoodieSchemaConverter.convertToSchema(projectedRowType.copy()).toString(), new
ArrayList<>());
+ HoodieCdcSplitReaderFunction function = new HoodieCdcSplitReaderFunction(
+ conf,
+ tableState,
+ internalSchemaManager,
+ ROW_DATA_TYPE.getChildren(),
+ Collections.emptyList(),
+ false);
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithEmptyFieldTypes() {
+ HoodieCdcSplitReaderFunction function = new HoodieCdcSplitReaderFunction(
+ conf,
+ tableState,
+ internalSchemaManager,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ false);
+
+ assertNotNull(function);
+ }
+
+ // -------------------------------------------------------------------------
+ // close() tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testCloseWithoutReadingDoesNotThrow() throws Exception {
+ HoodieCdcSplitReaderFunction function = createFunction();
+ function.close();
+ }
+
+ @Test
+ public void testMultipleClosesDoNotThrow() throws Exception {
+ HoodieCdcSplitReaderFunction function = createFunction();
+ function.close();
+ function.close();
+ function.close();
+ }
+
+ // -------------------------------------------------------------------------
+ // read() argument-validation tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testReadWithNonCdcSplitDelegatesToFallback() {
+ // Non-CDC splits are forwarded to the fallback HoodieSplitReaderFunction
rather than
+ // rejected. The fallback will attempt real I/O and fail, but the failure
must NOT be
+ // an IllegalArgumentException (that would indicate the type-guard wrongly
rejected it).
+ HoodieCdcSplitReaderFunction function = createFunction();
+
+ HoodieSourceSplit nonCdcSplit = new HoodieSourceSplit(
+ 1, "base.parquet", Option.empty(), tempDir.getAbsolutePath(),
+ "", "read_optimized", "20230101000000000", "file-1", Option.empty());
+
+ Exception ex = assertThrows(Exception.class, () ->
function.read(nonCdcSplit));
+ assertNotNull(ex);
+ // Must not be IllegalArgumentException (which the old type-guard wrongly
threw)
+ if (ex instanceof IllegalArgumentException) {
+ throw new AssertionError("read() should not throw
IllegalArgumentException for non-CDC split; "
+ + "it should fall through to the fallback reader", ex);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Integration: read() accepts a HoodieCdcSourceSplit (validation only)
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testReadAcceptsCdcSourceSplitType() {
+ // Verify that HoodieCdcSourceSplit is accepted (cast doesn't throw).
+ // Actual I/O would require a real Hoodie table, so we only check the
+ // type-guard passes by catching the downstream I/O error rather than
+ // an IllegalArgumentException.
+ HoodieCdcSplitReaderFunction function = createFunction();
+
+ HoodieCDCFileSplit[] changes = {
+ new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
+ };
+ HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
+ 1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
+ changes, "read_optimized", "20230101000000000");
+
+ // The call should NOT throw IllegalArgumentException (type guard passes).
+ // It will throw some other exception when trying to do real I/O.
+ Exception ex = assertThrows(Exception.class, () ->
function.read(cdcSplit));
+ assertNotNull(ex);
+ // Must not be an IllegalArgumentException (which the type guard throws)
+ if (ex instanceof IllegalArgumentException) {
+ throw new AssertionError("read() should not throw
IllegalArgumentException for a CdcSourceSplit", ex);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
new file mode 100644
index 000000000000..56e9ae53ce72
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCdcSourceSplit}.
+ */
+public class TestHoodieCdcSourceSplit {
+
+ private static HoodieCDCFileSplit makeFileSplit(String instant) {
+ return new HoodieCDCFileSplit(instant,
HoodieCDCInferenceCase.BASE_FILE_INSERT, "cdc.parquet");
+ }
+
+ @Test
+ public void testGetChangesReturnsConstructorValue() {
+ HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000"),
makeFileSplit("20230102000000000")};
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ changes, "read_optimized", "20230101000000000");
+
+ assertArrayEquals(changes, split.getChanges());
+ assertSame(changes, split.getChanges());
+ }
+
+ @Test
+ public void testGetMaxCompactionMemoryInBytes() {
+ long memory = 256 * 1024 * 1024L;
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory,
"file-1", new HoodieCDCFileSplit[0],
+ "read_optimized", "20230101000000000");
+
+ assertEquals(memory, split.getMaxCompactionMemoryInBytes());
+ }
+
+ @Test
+ public void testGetTablePath() {
+ String tablePath = "/warehouse/hudi/my_table";
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L,
+ "file-1", new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
+
+ assertEquals(tablePath, split.getTablePath());
+ }
+
+ @Test
+ public void testGetFileId() {
+ String fileId = "my-file-id-abc-123";
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
+ fileId, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
+
+ assertEquals(fileId, split.getFileId());
+ }
+
+ @Test
+ public void testSplitIdFormat() {
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ // splitId() is inherited from HoodieSourceSplit and returns
"splitNum:fileId"
+ assertEquals("1:file-1", split.splitId());
+ }
+
+ @Test
+ public void testToStringContainsExpectedFields() {
+ HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000")};
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-xyz",
+ changes, "read_optimized", "20230101000000000");
+
+ String str = split.toString();
+ assertNotNull(str);
+ // Lombok @ToString uses the actual class name "HoodieCdcSourceSplit"
+ assertTrue(str.contains("HoodieCdcSourceSplit"), "toString should mention
class name");
+ // CDC-specific fields are included
+ assertTrue(str.contains("changes"), "toString should mention changes");
+ assertTrue(str.contains("maxCompactionMemoryInBytes"), "toString should
mention maxCompactionMemoryInBytes");
+ }
+
+ @Test
+ public void testEmptyChangesArray() {
+ HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[0];
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(5, "/table", 512L,
"file-empty",
+ changes, "read_optimized", "20230101000000000");
+
+ assertNotNull(split.getChanges());
+ assertEquals(0, split.getChanges().length);
+ }
+
+ @Test
+ public void testIsInstanceOfHoodieSourceSplit() {
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ assertTrue(split instanceof HoodieSourceSplit);
+ }
+
+ @Test
+ public void testLargeCompactionMemory() {
+ long largeMemory = Long.MAX_VALUE;
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table",
largeMemory, "file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ assertEquals(largeMemory, split.getMaxCompactionMemoryInBytes());
+ }
+
+ @Test
+ public void testDefaultConsumedAndFileOffset() {
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ assertEquals(0L, split.getConsumed());
+ assertEquals(0, split.getFileOffset());
+ assertFalse(split.isConsumed());
+ }
+
+ @Test
+ public void testConsumeIncrementsConsumed() {
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ assertFalse(split.isConsumed());
+ split.consume();
+ assertTrue(split.isConsumed());
+ assertEquals(1L, split.getConsumed());
+ split.consume();
+ split.consume();
+ assertEquals(3L, split.getConsumed());
+ }
+
+ @Test
+ public void testUpdatePositionSetsConsumedAndFileOffset() {
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
+ new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+
+ split.updatePosition(7, 42L);
+ assertEquals(7, split.getFileOffset());
+ assertEquals(42L, split.getConsumed());
+ assertTrue(split.isConsumed());
+ }
+
+ @Test
+ public void testMultipleChangesWithDifferentInferenceCases() {
+ HoodieCDCFileSplit insert = new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet");
+ HoodieCDCFileSplit delete = new HoodieCDCFileSplit("20230102000000000",
HoodieCDCInferenceCase.BASE_FILE_DELETE, "delete.log");
+ HoodieCDCFileSplit logFile = new HoodieCDCFileSplit("20230103000000000",
HoodieCDCInferenceCase.LOG_FILE, "log.log");
+ HoodieCDCFileSplit replace = new HoodieCDCFileSplit("20230104000000000",
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet");
+
+ HoodieCDCFileSplit[] changes = {insert, delete, logFile, replace};
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-multi",
+ changes, "read_optimized", "20230101000000000");
+
+ assertEquals(4, split.getChanges().length);
+ assertSame(insert, split.getChanges()[0]);
+ assertSame(delete, split.getChanges()[1]);
+ assertSame(logFile, split.getChanges()[2]);
+ assertSame(replace, split.getChanges()[3]);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
new file mode 100644
index 000000000000..25e204817d07
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.IncrementalInputSplits;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieContinuousSplitBatch}.
+ */
+public class TestHoodieContinuousSplitBatch {
+
+ private static final String TABLE_PATH = "/warehouse/test_table";
+ private static final long MAX_MEMORY = 128 * 1024 * 1024L;
+
+ // -------------------------------------------------------------------------
+ // fromResult tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testFromResultWithEmptyResult() {
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.emptyList(), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertNotNull(batch);
+ assertTrue(batch.getSplits().isEmpty());
+ assertEquals("20230101000000000", batch.getEndInstant());
+ assertNull(batch.getOffset());
+ }
+
+ @Test
+ public void testFromResultWithCdcSplitProducesHoodieCdcSourceSplit() {
+ HoodieCDCFileSplit[] changes = {
+ new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
+ };
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit),
"20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(1, batch.getSplits().size());
+ HoodieSourceSplit split = batch.getSplits().iterator().next();
+ assertTrue(split instanceof HoodieCdcSourceSplit,
+ "CDC input split should produce HoodieCdcSourceSplit, got: " +
split.getClass());
+
+ HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+ assertEquals(TABLE_PATH, cdcSplit.getTablePath());
+ assertEquals(MAX_MEMORY, cdcSplit.getMaxCompactionMemoryInBytes());
+ assertEquals("file-cdc", cdcSplit.getFileId());
+ assertEquals(1, cdcSplit.getChanges().length);
+ }
+
+ @Test
+ public void testFromResultWithRegularSplitProducesHoodieSourceSplit() {
+ MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
+ 1,
+ TABLE_PATH + "/base.parquet",
+ Option.of(Collections.singletonList(TABLE_PATH + "/log1.log")),
+ "20230101000000000",
+ TABLE_PATH,
+ MAX_MEMORY,
+ "payload_combine",
+ null,
+ "file-mor");
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.singletonList(morSplit), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(1, batch.getSplits().size());
+ HoodieSourceSplit split = batch.getSplits().iterator().next();
+ // Should be a plain HoodieSourceSplit (not a CDC one)
+ assertTrue(split instanceof HoodieSourceSplit);
+ assertTrue(!(split instanceof HoodieCdcSourceSplit),
+ "MOR split should produce HoodieSourceSplit, not
HoodieCdcSourceSplit");
+ assertEquals("file-mor", split.getFileId());
+ assertEquals(TABLE_PATH, split.getTablePath());
+ }
+
+ @Test
+ public void testFromResultWithMixedSplitsPreservesOrder() {
+ HoodieCDCFileSplit[] changes = {
+ new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.LOG_FILE, "cdc.log")
+ };
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+
+ MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
+ 2,
+ TABLE_PATH + "/base.parquet",
+ Option.empty(),
+ "20230101000000000",
+ TABLE_PATH,
+ MAX_MEMORY,
+ "payload_combine",
+ null,
+ "file-mor");
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Arrays.asList(cdcInputSplit, morSplit), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(2, batch.getSplits().size());
+ List<HoodieSourceSplit> splits = (List<HoodieSourceSplit>)
batch.getSplits();
+ assertTrue(splits.get(0) instanceof HoodieCdcSourceSplit);
+ assertTrue(!(splits.get(1) instanceof HoodieCdcSourceSplit));
+ assertEquals("file-cdc", splits.get(0).getFileId());
+ assertEquals("file-mor", splits.get(1).getFileId());
+ }
+
+ @Test
+ public void testFromResultPreservesEndInstant() {
+ String endInstant = "20230215120000000";
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.emptyList(), endInstant);
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(endInstant, batch.getEndInstant());
+ }
+
+ @Test
+ public void testFromResultPreservesOffset() {
+ String offset = "20230215120000000";
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.emptyList(), "20230215120000000", offset);
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(offset, batch.getOffset());
+ }
+
+ @Test
+ public void testFromResultWithNullOffsetPreservesNullOffset() {
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.emptyList(), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertNull(batch.getOffset());
+ }
+
+ @Test
+ public void testFromResultCdcSplitPreservesChangesArray() {
+ HoodieCDCFileSplit split1 = new HoodieCDCFileSplit(
+ "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT,
"insert.parquet");
+ HoodieCDCFileSplit split2 = new HoodieCDCFileSplit(
+ "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE,
"delete.log");
+ HoodieCDCFileSplit[] changes = {split1, split2};
+
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit),
"20230102000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit)
batch.getSplits().iterator().next();
+ assertEquals(2, cdcSplit.getChanges().length);
+ assertSame(split1, cdcSplit.getChanges()[0]);
+ assertSame(split2, cdcSplit.getChanges()[1]);
+ }
+
+ @Test
+ public void testFromResultWithMultipleCdcSplits() {
+ HoodieCDCFileSplit[] changes1 = {
+ new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert1.parquet")
+ };
+ HoodieCDCFileSplit[] changes2 = {
+ new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet")
+ };
+
+ CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc-1", changes1);
+ CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY,
"file-cdc-2", changes2);
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Arrays.<MergeOnReadInputSplit>asList(cdc1, cdc2), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(2, batch.getSplits().size());
+ for (HoodieSourceSplit s : batch.getSplits()) {
+ assertTrue(s instanceof HoodieCdcSourceSplit);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Constructor and EMPTY tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testEmptyConstant() {
+ HoodieContinuousSplitBatch empty = HoodieContinuousSplitBatch.EMPTY;
+
+ assertNotNull(empty);
+ assertTrue(empty.getSplits().isEmpty());
+ assertEquals("", empty.getEndInstant());
+ assertEquals("", empty.getOffset());
+ }
+
+ @Test
+ public void testConstructorStoresFields() {
+ Collection<HoodieSourceSplit> splits = Collections.singletonList(
+ new HoodieSourceSplit(1, null, Option.empty(), TABLE_PATH, "",
"read_optimized", "", "file-1", Option.empty()));
+ String endInstant = "20230215000000000";
+ String offset = "some-offset";
+
+ HoodieContinuousSplitBatch batch = new HoodieContinuousSplitBatch(splits,
endInstant, offset);
+
+ assertSame(splits, batch.getSplits());
+ assertEquals(endInstant, batch.getEndInstant());
+ assertEquals(offset, batch.getOffset());
+ }
+
+ @Test
+ public void testConstructorRejectsNullSplits() {
+ org.junit.jupiter.api.Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new HoodieContinuousSplitBatch(null, "20230101000000000", null));
+ }
+
+ @Test
+ public void testConstructorRejectsNullEndInstant() {
+ org.junit.jupiter.api.Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new HoodieContinuousSplitBatch(Collections.emptyList(), null,
null));
+ }
+
+ // -------------------------------------------------------------------------
+ // resolvePartitionPath coverage tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testFromResultMorSplitWithLogPathsOnlyDerivesPartitionPath() {
+ // Split has no base file, only log paths; partition path should be
derived from the first log path
+ MergeOnReadInputSplit logOnlySplit = new MergeOnReadInputSplit(
+ 1,
+ null,
+ Option.of(Collections.singletonList(TABLE_PATH + "/2023/01/log.log")),
+ "20230101000000000",
+ TABLE_PATH,
+ MAX_MEMORY,
+ "payload_combine",
+ null,
+ "file-log-only");
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.singletonList(logOnlySplit), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(1, batch.getSplits().size());
+ HoodieSourceSplit split = batch.getSplits().iterator().next();
+ // Partition path is derived relative to the table path
+ assertEquals("2023/01", split.getPartitionPath());
+ }
+
+ @Test
+ public void testFromResultMorSplitWithNoFilesHasEmptyPartitionPath() {
+ // Split has neither base file nor log paths; partition path must be empty
string
+ MergeOnReadInputSplit emptyPathSplit = new MergeOnReadInputSplit(
+ 1,
+ null,
+ Option.empty(),
+ "20230101000000000",
+ TABLE_PATH,
+ MAX_MEMORY,
+ "read_optimized",
+ null,
+ "file-no-paths");
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.singletonList(emptyPathSplit), "20230101000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(1, batch.getSplits().size());
+ HoodieSourceSplit split = batch.getSplits().iterator().next();
+ assertEquals("", split.getPartitionPath());
+ }
+
+ @Test
+ public void
testFromResultCdcSplitWithEmptyChangesUsesEndInstantAsLatestCommit() {
+ // When a CDC split has no changes, the batch end instant is used as the
latest commit
+ CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH,
MAX_MEMORY, "file-cdc-empty",
+ new HoodieCDCFileSplit[0]);
+ String endInstant = "20230301000000000";
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.<MergeOnReadInputSplit>singletonList(cdcWithNoChanges),
endInstant);
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ assertEquals(1, batch.getSplits().size());
+ HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit)
batch.getSplits().iterator().next();
+ assertEquals(0, cdcSplit.getChanges().length);
+ // With no changes, latestCommit must fall back to the batch end instant
+ assertEquals(endInstant, cdcSplit.getLatestCommit());
+ }
+
+ @Test
+ public void testFromResultMorSplitWithSubdirectoryPartitionPath() {
+ // Verifies multi-level partition paths (e.g. year=2023/month=01/day=15)
are resolved correctly
+ MergeOnReadInputSplit partitionedSplit = new MergeOnReadInputSplit(
+ 1,
+ TABLE_PATH + "/year=2023/month=01/day=15/base.parquet",
+ Option.empty(),
+ "20230115000000000",
+ TABLE_PATH,
+ MAX_MEMORY,
+ "read_optimized",
+ null,
+ "file-partitioned");
+
+ IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
+ Collections.singletonList(partitionedSplit), "20230115000000000");
+
+ HoodieContinuousSplitBatch batch =
HoodieContinuousSplitBatch.fromResult(result);
+
+ HoodieSourceSplit split = batch.getSplits().iterator().next();
+ assertEquals("year=2023/month=01/day=15", split.getPartitionPath());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index d200935fa0e3..7d7cbecc1a1a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -21,6 +21,9 @@ package org.apache.hudi.source.split;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
+
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
@@ -1174,5 +1177,147 @@ public class TestHoodieSourceSplitSerializer {
assertEquals("Composition Range is not supported.",
exception.getMessage());
}
-}
+ // -------------------------------------------------------------------------
+ // CDC split serialization tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testSerializeAndDeserializeCdcSplit() throws IOException {
+ HoodieCDCFileSplit change = new HoodieCDCFileSplit(
+ "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT,
"insert.parquet");
+ HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+ 10, "/table/path", 128 * 1024 * 1024L, "file-cdc",
+ new HoodieCDCFileSplit[]{change}, "read_optimized",
"20230101000000000");
+
+ byte[] serialized = serializer.serialize(original);
+ assertNotNull(serialized);
+
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertTrue(deserialized instanceof HoodieCdcSourceSplit,
+ "CDC split must deserialize as HoodieCdcSourceSplit");
+ HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+ assertEquals(original.getSplitNum(), cdcDeserialized.getSplitNum());
+ assertEquals(original.getTablePath(), cdcDeserialized.getTablePath());
+ assertEquals(original.getFileId(), cdcDeserialized.getFileId());
+ assertEquals(original.getMaxCompactionMemoryInBytes(),
cdcDeserialized.getMaxCompactionMemoryInBytes());
+ assertEquals(original.getMergeType(), cdcDeserialized.getMergeType());
+ assertEquals(original.getLatestCommit(),
cdcDeserialized.getLatestCommit());
+ assertEquals(1, cdcDeserialized.getChanges().length);
+ assertEquals("20230101000000000",
cdcDeserialized.getChanges()[0].getInstant());
+ assertEquals(HoodieCDCInferenceCase.BASE_FILE_INSERT,
cdcDeserialized.getChanges()[0].getCdcInferCase());
+ }
+
+ @Test
+ public void testSerializeAndDeserializeCdcSplitWithEmptyChanges() throws
IOException {
+ HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+ 11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc",
+ new HoodieCDCFileSplit[0], "payload_combine", "20230201000000000");
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+ HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+ assertEquals(0, cdcDeserialized.getChanges().length);
+ assertEquals("/warehouse/table", cdcDeserialized.getTablePath());
+ assertEquals(256 * 1024 * 1024L,
cdcDeserialized.getMaxCompactionMemoryInBytes());
+ }
+
+ @Test
+ public void testSerializeAndDeserializeCdcSplitWithMultipleChanges() throws
IOException {
+ HoodieCDCFileSplit insert = new HoodieCDCFileSplit(
+ "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT,
"insert.parquet");
+ HoodieCDCFileSplit delete = new HoodieCDCFileSplit(
+ "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE,
"delete.log");
+ HoodieCDCFileSplit logFile = new HoodieCDCFileSplit(
+ "20230103000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
+ HoodieCDCFileSplit replace = new HoodieCDCFileSplit(
+ "20230104000000000", HoodieCDCInferenceCase.REPLACE_COMMIT,
"replace.parquet");
+
+ HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+ 12, "/table", 64 * 1024 * 1024L, "file-multi-cdc",
+ new HoodieCDCFileSplit[]{insert, delete, logFile, replace},
+ "read_optimized", "20230104000000000");
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+ HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+ assertEquals(4, cdcDeserialized.getChanges().length);
+ assertEquals("20230101000000000",
cdcDeserialized.getChanges()[0].getInstant());
+ assertEquals(HoodieCDCInferenceCase.BASE_FILE_INSERT,
cdcDeserialized.getChanges()[0].getCdcInferCase());
+ assertEquals("20230102000000000",
cdcDeserialized.getChanges()[1].getInstant());
+ assertEquals(HoodieCDCInferenceCase.BASE_FILE_DELETE,
cdcDeserialized.getChanges()[1].getCdcInferCase());
+ assertEquals("20230103000000000",
cdcDeserialized.getChanges()[2].getInstant());
+ assertEquals(HoodieCDCInferenceCase.LOG_FILE,
cdcDeserialized.getChanges()[2].getCdcInferCase());
+ assertEquals("20230104000000000",
cdcDeserialized.getChanges()[3].getInstant());
+ assertEquals(HoodieCDCInferenceCase.REPLACE_COMMIT,
cdcDeserialized.getChanges()[3].getCdcInferCase());
+ }
+
+ @Test
+ public void testSerializeAndDeserializeCdcSplitPreservesConsumedState()
throws IOException {
+ HoodieCDCFileSplit change = new HoodieCDCFileSplit(
+ "20230101000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
+ HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+ 13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",
+ new HoodieCDCFileSplit[]{change}, "read_optimized",
"20230101000000000");
+
+ original.updatePosition(5, 200L);
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertTrue(deserialized instanceof HoodieCdcSourceSplit);
+ HoodieCdcSourceSplit cdcDeserialized = (HoodieCdcSourceSplit) deserialized;
+ assertEquals(5, cdcDeserialized.getFileOffset());
+ assertEquals(200L, cdcDeserialized.getConsumed());
+ assertTrue(cdcDeserialized.isConsumed());
+ }
+
+ @Test
+ public void testRoundTripCdcSplitMultipleTimes() throws IOException {
+ HoodieCDCFileSplit change1 = new HoodieCDCFileSplit(
+ "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT,
"insert.parquet");
+ HoodieCDCFileSplit change2 = new HoodieCDCFileSplit(
+ "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE,
"delete.log");
+
+ HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
+ 14, "/roundtrip/table", 128 * 1024 * 1024L, "file-roundtrip-cdc",
+ new HoodieCDCFileSplit[]{change1, change2},
+ "read_optimized", "20230102000000000");
+
+ original.updatePosition(3, 50L);
+
+ HoodieSourceSplit current = original;
+ for (int i = 0; i < 5; i++) {
+ byte[] s = serializer.serialize(current);
+ current = serializer.deserialize(serializer.getVersion(), s);
+ }
+
+ assertTrue(current instanceof HoodieCdcSourceSplit);
+ HoodieCdcSourceSplit cdcCurrent = (HoodieCdcSourceSplit) current;
+ assertEquals(original.getSplitNum(), cdcCurrent.getSplitNum());
+ assertEquals(original.getTablePath(), cdcCurrent.getTablePath());
+ assertEquals(original.getFileId(), cdcCurrent.getFileId());
+ assertEquals(original.getMaxCompactionMemoryInBytes(),
cdcCurrent.getMaxCompactionMemoryInBytes());
+ assertEquals(2, cdcCurrent.getChanges().length);
+ assertEquals(3, cdcCurrent.getFileOffset());
+ assertEquals(50L, cdcCurrent.getConsumed());
+ }
+
+ @Test
+ public void testRegularSplitDeserializesAsHoodieSourceSplitNotCdc() throws
IOException {
+ HoodieSourceSplit original = new HoodieSourceSplit(
+ 15, "base-path", Option.empty(), "/table/path", "/partition",
+ "read_optimized", "19700101000000000", "file-regular", Option.empty());
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertFalse(deserialized instanceof HoodieCdcSourceSplit,
+ "Regular split must not deserialize as HoodieCdcSourceSplit");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f577d4cfaf39..d833d2929429 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -189,8 +189,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
- void
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode
mode) throws Exception {
+ @MethodSource("cdcSupplementalLoggingModeWithSourceV2")
+ void
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode
mode, boolean useSourceV2) throws Exception {
streamTableEnv.getConfig().getConfiguration()
.setString("table.dynamic-table-options.enabled", "true");
// create filesystem table named source
@@ -204,6 +204,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.CDC_ENABLED, true)
.option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
@@ -3216,6 +3217,18 @@ public class ITTestHoodieDataSource {
return Stream.of(data).map(Arguments::of);
}
+ private static Stream<Arguments> cdcSupplementalLoggingModeWithSourceV2() {
+ Object[][] data =
+ new Object[][] {
+ {HoodieCDCSupplementalLoggingMode.DATA_BEFORE, false},
+ {HoodieCDCSupplementalLoggingMode.DATA_BEFORE, true},
+ {HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, false},
+ {HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, true},
+ {HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY, false},
+ {HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY, true}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
/**
* Return test params => (HoodieTableType, true/false).
*/