cshuo commented on code in PR #18361:
URL: https://github.com/apache/hudi/pull/18361#discussion_r2985488503


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java:
##########
@@ -0,0 +1,1053 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieCdcSourceSplit;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.RowDataProjection;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * CDC reader function for source V2. Reads CDC splits ({@link 
HoodieCdcSourceSplit}) and
+ * emits change-log {@link RowData} records tagged with the appropriate {@link 
RowKind}.
+ *
+ * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted 
for the
+ * {@link SplitReaderFunction} contract.
+ */
+@Slf4j
+public class HoodieCdcSplitReaderFunction implements 
SplitReaderFunction<RowData> {
+
+  private final org.apache.flink.configuration.Configuration conf;
+  private final HoodieSchema tableSchema;
+  private final HoodieSchema requiredSchema;
+  private final RowType rowType;
+  private final RowType requiredRowType;
+  private final int[] requiredPositions;
+  private final InternalSchemaManager internalSchemaManager;
+  private final List<DataType> fieldTypes;
+
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieWriteConfig writeConfig;
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+  private transient ClosableIterator<RowData> currentIterator;
+  // Fallback reader for non-CDC splits (e.g. snapshot reads when 
read.start-commit='earliest')
+  private transient HoodieSplitReaderFunction fallbackReaderFunction;
+
+  /**
+   * Creates a CDC split reader function.
+   *
+   * @param conf                  Flink configuration
+   * @param tableSchema           Full Avro schema of the Hoodie table
+   * @param requiredSchema        Projected schema required by the query
+   * @param rowType               Full Flink {@link RowType} of the table
+   * @param requiredRowType       Projected Flink {@link RowType} required by 
the query
+   * @param internalSchemaManager Schema-evolution manager
+   * @param fieldTypes            DataType list for all table fields (used for 
parquet reading)
+   */
+  public HoodieCdcSplitReaderFunction(
+      org.apache.flink.configuration.Configuration conf,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
+      RowType rowType,
+      RowType requiredRowType,
+      InternalSchemaManager internalSchemaManager,
+      List<DataType> fieldTypes) {
+    this.conf = conf;
+    this.tableSchema = tableSchema;
+    this.requiredSchema = requiredSchema;
+    this.rowType = rowType;
+    this.requiredRowType = requiredRowType;
+    this.requiredPositions = computeRequiredPositions(rowType, 
requiredRowType);
+    this.internalSchemaManager = internalSchemaManager;
+    this.fieldTypes = fieldTypes;
+  }
+
+  @Override
+  public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
+    if (!(split instanceof HoodieCdcSourceSplit)) {
+      // Non-CDC splits arrive when reading from 'earliest' with no prior CDC 
history
+      // (i.e. instantRange is empty → snapshot path). Fall back to the 
standard MOR reader
+      // which emits all records as INSERT rows, matching the expected 
snapshot behaviour.
+      return getFallbackReaderFunction().read(split);
+    }
+    HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
+
+    HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+    HoodieTableMetaClient client = getMetaClient();
+    HoodieWriteConfig wConfig = getWriteConfig();
+
+    ImageManager imageManager = new ImageManager(rowType, wConfig, 
this::getFileSliceIterator);
+
+    Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc 
=
+        cdcFileSplit -> createRecordIteratorSafe(
+            cdcSplit.getTablePath(),
+            cdcSplit.getMaxCompactionMemoryInBytes(),
+            cdcFileSplit,
+            mode,
+            imageManager,
+            client);
+
+    currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), 
imageManager, recordIteratorFunc);
+    BatchRecords<RowData> records = BatchRecords.forRecords(
+        split.splitId(), currentIterator, split.getFileOffset(), 
split.getConsumed());
+    records.seek(split.getConsumed());
+    return records;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (currentIterator != null) {
+      currentIterator.close();
+    }
+    if (fallbackReaderFunction != null) {
+      fallbackReaderFunction.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Internal helpers
+  // -------------------------------------------------------------------------
+
+  private HoodieSplitReaderFunction getFallbackReaderFunction() {

Review Comment:
   The non-CDC fallback constructs `HoodieSplitReaderFunction` with an empty 
predicate list. `CdcInputFormat`'s non-CDC path preserves configured 
predicates, so this fallback is not on par.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to