HuangZhenQiu commented on code in PR #18361:
URL: https://github.com/apache/hudi/pull/18361#discussion_r2983313932


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java:
##########
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.RowDataProjection;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * CDC reader function for source V2. Reads CDC splits ({@link 
HoodieCdcSourceSplit}) and
+ * emits change-log {@link RowData} records tagged with the appropriate {@link 
RowKind}.
+ *
+ * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted 
for the
+ * {@link SplitReaderFunction} contract.
+ */
+@Slf4j
+public class HoodieCdcSplitReaderFunction implements 
SplitReaderFunction<RowData> {
+
+  private final org.apache.flink.configuration.Configuration conf;
+  private final HoodieSchema tableSchema;
+  private final HoodieSchema requiredSchema;
+  private final RowType rowType;
+  private final RowType requiredRowType;
+  private final int[] requiredPositions;
+  private final InternalSchemaManager internalSchemaManager;
+  private final List<DataType> fieldTypes;
+
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieWriteConfig writeConfig;
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+  private transient ClosableIterator<RowData> currentIterator;
+  // Fallback reader for non-CDC splits (e.g. snapshot reads when 
read.start-commit='earliest')
+  private transient HoodieSplitReaderFunction fallbackReaderFunction;
+
+  /**
+   * Creates a CDC split reader function.
+   *
+   * @param conf                  Flink configuration
+   * @param tableSchema           Full Avro schema of the Hoodie table
+   * @param requiredSchema        Projected schema required by the query
+   * @param rowType               Full Flink {@link RowType} of the table
+   * @param requiredRowType       Projected Flink {@link RowType} required by 
the query
+   * @param internalSchemaManager Schema-evolution manager
+   * @param fieldTypes            DataType list for all table fields (used for 
parquet reading)
+   */
+  public HoodieCdcSplitReaderFunction(
+      org.apache.flink.configuration.Configuration conf,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
+      RowType rowType,
+      RowType requiredRowType,
+      InternalSchemaManager internalSchemaManager,
+      List<DataType> fieldTypes) {
+    this.conf = conf;
+    this.tableSchema = tableSchema;
+    this.requiredSchema = requiredSchema;
+    this.rowType = rowType;
+    this.requiredRowType = requiredRowType;
+    this.requiredPositions = computeRequiredPositions(rowType, 
requiredRowType);
+    this.internalSchemaManager = internalSchemaManager;
+    this.fieldTypes = fieldTypes;
+  }
+
+  @Override
+  public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
+    if (!(split instanceof HoodieCdcSourceSplit)) {
+      // Non-CDC splits arrive when reading from 'earliest' with no prior CDC 
history
+      // (i.e. instantRange is empty → snapshot path). Fall back to the 
standard MOR reader
+      // which emits all records as INSERT rows, matching the expected 
snapshot behaviour.
+      return getFallbackReaderFunction().read(split);
+    }
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+
+    HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+    HoodieTableMetaClient client = getMetaClient();
+    HoodieWriteConfig wConfig = getWriteConfig();
+
+    ImageManager imageManager = new ImageManager(rowType, wConfig, 
this::getFileSliceIterator);
+
+    Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc 
=
+        cdcFileSplit -> createRecordIteratorSafe(
+            cdcSplit.getTablePath(),
+            cdcSplit.getMaxCompactionMemoryInBytes(),
+            cdcFileSplit,
+            mode,
+            imageManager,
+            client);
+
+    currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), 
imageManager, recordIteratorFunc);
+    BatchRecords<RowData> records = BatchRecords.forRecords(
+        split.splitId(), currentIterator, split.getFileOffset(), 
split.getConsumed());
+    records.seek(split.getConsumed());
+    return records;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (currentIterator != null) {
+      currentIterator.close();
+    }
+    if (fallbackReaderFunction != null) {
+      fallbackReaderFunction.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Internal helpers
+  // -------------------------------------------------------------------------
+
+  private HoodieSplitReaderFunction getFallbackReaderFunction() {
+    if (fallbackReaderFunction == null) {
+      fallbackReaderFunction = new HoodieSplitReaderFunction(
+          conf,
+          tableSchema,
+          requiredSchema,
+          internalSchemaManager,
+          conf.get(FlinkOptions.MERGE_TYPE),
+          Collections.emptyList(),
+          false);
+    }
+    return fallbackReaderFunction;
+  }
+
+  private ClosableIterator<RowData> createRecordIteratorSafe(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager,
+      HoodieTableMetaClient client) {
+    try {
+      return createRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager, client);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create CDC record iterator for 
split: " + fileSplit, e);
+    }
+  }
+
+  private ClosableIterator<RowData> createRecordIterator(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager,
+      HoodieTableMetaClient client) throws IOException {
+    switch (fileSplit.getCdcInferCase()) {
+      case BASE_FILE_INSERT: {
+        ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
+            "CDC file path should exist and be singleton for 
BASE_FILE_INSERT");
+        String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
+        return new AddBaseFileIterator(getBaseFileIterator(path));
+      }
+      case BASE_FILE_DELETE: {
+        ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+            "Before file slice should exist for BASE_FILE_DELETE");
+        FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
+        MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(tablePath, fileSlice, 
maxCompactionMemoryInBytes);
+        return new RemoveBaseFileIterator(requiredRowType, requiredPositions, 
getFileSliceIterator(inputSplit));
+      }
+      case AS_IS: {
+        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+        HoodieSchema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+        switch (mode) {
+          case DATA_BEFORE_AFTER:
+            return new BeforeAfterImageIterator(
+                getHadoopConf(), tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
+          case DATA_BEFORE:
+            return new BeforeImageIterator(
+                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
requiredRowType,
+                maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
+          case OP_KEY_ONLY:
+            return new RecordKeyImageIterator(
+                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
requiredRowType,
+                maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
+          default:
+            throw new AssertionError("Unexpected CDC supplemental logging 
mode: " + mode);
+        }
+      }
+      case LOG_FILE: {
+        ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
+            "CDC file path should exist and be singleton for LOG_FILE");
+        String logFilePath = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
+        MergeOnReadInputSplit split = 
CdcInputFormat.singleLogFile2Split(tablePath, logFilePath, 
maxCompactionMemoryInBytes);
+        ClosableIterator<HoodieRecord<RowData>> recordIterator = 
getFileSliceHoodieRecordIterator(split);
+        return new DataLogFileIterator(
+            maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema, 
requiredRowType, requiredPositions,
+            recordIterator, client, getWriteConfig());
+      }
+      case REPLACE_COMMIT: {
+        return new ReplaceCommitIterator(
+            conf, tablePath, requiredRowType, requiredPositions, 
maxCompactionMemoryInBytes,
+            fileSplit, this::getFileSliceIterator);
+      }
+      default:
+        throw new AssertionError("Unexpected CDC file split infer case: " + 
fileSplit.getCdcInferCase());
+    }
+  }
+
+  /** Reads the full-schema before/after image for a file slice 
(emitDelete=false). */
+  private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
+    FileSlice fileSlice = buildFileSlice(split);
+    try {
+      HoodieFileGroupReader<RowData> reader = 
FormatUtils.createFileGroupReader(
+          getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+          tableSchema, tableSchema, split.getLatestCommit(),
+          FlinkOptions.REALTIME_PAYLOAD_COMBINE, false,
+          Collections.emptyList(), split.getInstantRange());
+      return reader.getClosableIterator();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create file slice iterator for 
split: " + split, e);
+    }
+  }
+
+  /** Reads a single log file and returns a typed {@link HoodieRecord} 
iterator (for LOG_FILE CDC inference). */
+  private ClosableIterator<HoodieRecord<RowData>> 
getFileSliceHoodieRecordIterator(MergeOnReadInputSplit split) {
+    FileSlice fileSlice = buildFileSlice(split);
+    try {
+      HoodieFileGroupReader<RowData> reader = 
FormatUtils.createFileGroupReader(
+          getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice,
+          tableSchema, tableSchema, split.getLatestCommit(),
+          FlinkOptions.REALTIME_PAYLOAD_COMBINE, true,
+          Collections.emptyList(), split.getInstantRange());
+      return reader.getClosableHoodieRecordIterator();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create Hoodie record iterator for 
split: " + split, e);
+    }
+  }
+
+  /** Reads a parquet CDC base file returning required-schema records. */
+  private ClosableIterator<RowData> getBaseFileIterator(String path) throws 
IOException {
+    String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+    DataType[] fieldTypesArray = fieldTypes.toArray(new DataType[0]);
+    return RecordIterators.getParquetRecordIterator(
+        internalSchemaManager,
+        conf.get(FlinkOptions.READ_UTC_TIMEZONE),
+        true,
+        HadoopConfigurations.getParquetConf(conf, getHadoopConf()),
+        fieldNames,
+        fieldTypesArray,
+        Collections.emptyMap(),
+        requiredPositions,
+        2048,
+        new org.apache.flink.core.fs.Path(path),
+        0,
+        Long.MAX_VALUE,
+        Collections.emptyList());
+  }
+
+  private static FileSlice buildFileSlice(MergeOnReadInputSplit split) {
+    return new FileSlice(
+        new HoodieFileGroupId("", split.getFileId()),
+        "",
+        split.getBasePath().map(HoodieBaseFile::new).orElse(null),
+        split.getLogPaths()
+            .map(lp -> 
lp.stream().map(HoodieLogFile::new).collect(Collectors.toList()))
+            .orElse(Collections.emptyList()));
+  }
+
+  private static int[] computeRequiredPositions(RowType rowType, RowType 
requiredRowType) {
+    List<String> allNames = rowType.getFieldNames();
+    return requiredRowType.getFieldNames().stream()
+        .map(allNames::indexOf)
+        .mapToInt(i -> i)
+        .toArray();
+  }
+
+  private HoodieTableMetaClient getMetaClient() {
+    if (metaClient == null) {
+      metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
+    }
+    return metaClient;
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    if (writeConfig == null) {
+      writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+    }
+    return writeConfig;
+  }
+
+  private org.apache.hadoop.conf.Configuration getHadoopConf() {
+    if (hadoopConf == null) {
+      hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+    }
+    return hadoopConf;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner iterators (adapted from CdcInputFormat inner classes)
+  // -------------------------------------------------------------------------
+
+  /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating 
record reading to a factory. */
+  private static class CdcFileSplitsIterator implements 
ClosableIterator<RowData> {
+    private ImageManager imageManager;
+    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
+    private ClosableIterator<RowData> recordIterator;
+
+    CdcFileSplitsIterator(
+        HoodieCDCFileSplit[] changes,
+        ImageManager imageManager,
+        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
+      this.fileSplitIterator = Arrays.asList(changes).iterator();
+      this.imageManager = imageManager;
+      this.recordIteratorFunc = recordIteratorFunc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (recordIterator != null) {
+        if (recordIterator.hasNext()) {
+          return true;
+        } else {
+          recordIterator.close();
+          recordIterator = null;
+        }
+      }
+      if (fileSplitIterator.hasNext()) {
+        recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+        return recordIterator.hasNext();
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return recordIterator.next();
+    }
+
+    @Override
+    public void close() {
+      if (recordIterator != null) {
+        recordIterator.close();
+      }
+      if (imageManager != null) {
+        imageManager.close();
+        imageManager = null;
+      }
+    }
+  }
+
+  /** Wraps a base-file parquet iterator and marks every record as {@link 
RowKind#INSERT}. */
+  private static class AddBaseFileIterator implements 
ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
+    private RowData currentRecord;
+
+    AddBaseFileIterator(ClosableIterator<RowData> nested) {
+      this.nested = nested;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (nested.hasNext()) {
+        currentRecord = nested.next();
+        currentRecord.setRowKind(RowKind.INSERT);
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return currentRecord;
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  /** Wraps a file-slice iterator and marks every record as {@link 
RowKind#DELETE}, with projection. */
+  private static class RemoveBaseFileIterator implements 
ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
+    private final RowDataProjection projection;
+
+    RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions, 
ClosableIterator<RowData> iterator) {
+      this.nested = iterator;
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nested.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+      RowData row = nested.next();
+      row.setRowKind(RowKind.DELETE);
+      return projection.project(row);
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  /**
+   * Handles the {@code LOG_FILE} CDC inference case: compares records from 
the log file
+   * against before-image snapshots to emit 
INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
+   */
+  private static class DataLogFileIterator implements 
ClosableIterator<RowData> {
+    private final HoodieSchema tableSchema;
+    private final long maxCompactionMemoryInBytes;
+    private final ImageManager imageManager;
+    private final RowDataProjection projection;
+    private final BufferedRecordMerger recordMerger;
+    private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
+    private final DeleteContext deleteContext;
+    private final HoodieReaderContext<RowData> readerContext;
+    private final String[] orderingFields;
+    private final TypedProperties props;
+
+    private ExternalSpillableMap<String, byte[]> beforeImages;
+    private RowData currentImage;
+    private RowData sideImage;
+
+    DataLogFileIterator(
+        long maxCompactionMemoryInBytes,
+        ImageManager imageManager,
+        HoodieCDCFileSplit cdcFileSplit,
+        HoodieSchema tableSchema,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
+        HoodieTableMetaClient metaClient,
+        HoodieWriteConfig writeConfig) throws IOException {
+      this.tableSchema = tableSchema;
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.imageManager = imageManager;
+      this.projection = 
requiredRowType.equals(tableSchema.getAvroSchema().getFields().size() == 
requiredRowType.getFieldCount()

Review Comment:
   It is a good catch. I can't follow the same pattern as table state is not 
easy to constructed. But definitely, the row type needs to be compared. Revised 
accordingly.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to