cshuo commented on code in PR #18361: URL: https://github.com/apache/hudi/pull/18361#discussion_r2985488503
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java: ########## @@ -0,0 +1,1053 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader.function; + +import org.apache.hudi.client.model.HoodieFlinkRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.DeleteContext; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.source.reader.BatchRecords; +import org.apache.hudi.source.reader.HoodieRecordWithPosition; +import org.apache.hudi.source.split.HoodieCdcSourceSplit; +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.format.FlinkReaderContextFactory; +import org.apache.hudi.table.format.FormatUtils; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; +import org.apache.hudi.table.format.cdc.CdcInputFormat; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.AvroToRowDataConverters; +import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.HoodieSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.RowDataProjection; + +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.hadoop.fs.Path; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; +import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; + +/** + * CDC reader function for source V2. Reads CDC splits ({@link HoodieCdcSourceSplit}) and + * emits change-log {@link RowData} records tagged with the appropriate {@link RowKind}. + * + * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted for the + * {@link SplitReaderFunction} contract. + */ +@Slf4j +public class HoodieCdcSplitReaderFunction implements SplitReaderFunction<RowData> { + + private final org.apache.flink.configuration.Configuration conf; + private final HoodieSchema tableSchema; + private final HoodieSchema requiredSchema; + private final RowType rowType; + private final RowType requiredRowType; + private final int[] requiredPositions; + private final InternalSchemaManager internalSchemaManager; + private final List<DataType> fieldTypes; + + private transient HoodieTableMetaClient metaClient; + private transient HoodieWriteConfig writeConfig; + private transient org.apache.hadoop.conf.Configuration hadoopConf; + private transient ClosableIterator<RowData> currentIterator; + // Fallback reader for non-CDC splits (e.g. snapshot reads when read.start-commit='earliest') + private transient HoodieSplitReaderFunction fallbackReaderFunction; + + /** + * Creates a CDC split reader function. + * + * @param conf Flink configuration + * @param tableSchema Full Avro schema of the Hoodie table + * @param requiredSchema Projected schema required by the query + * @param rowType Full Flink {@link RowType} of the table + * @param requiredRowType Projected Flink {@link RowType} required by the query + * @param internalSchemaManager Schema-evolution manager + * @param fieldTypes DataType list for all table fields (used for parquet reading) + */ + public HoodieCdcSplitReaderFunction( + org.apache.flink.configuration.Configuration conf, + HoodieSchema tableSchema, + HoodieSchema requiredSchema, + RowType rowType, + RowType requiredRowType, + InternalSchemaManager internalSchemaManager, + List<DataType> fieldTypes) { + this.conf = conf; + this.tableSchema = tableSchema; + this.requiredSchema = requiredSchema; + this.rowType = rowType; + this.requiredRowType = requiredRowType; + this.requiredPositions = computeRequiredPositions(rowType, requiredRowType); + this.internalSchemaManager = internalSchemaManager; + this.fieldTypes = fieldTypes; + } + + @Override + public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> read(HoodieSourceSplit split) { + if (!(split instanceof HoodieCdcSourceSplit)) { + // Non-CDC splits arrive when reading from 'earliest' with no prior CDC history + // (i.e. instantRange is empty → snapshot path). Fall back to the standard MOR reader + // which emits all records as INSERT rows, matching the expected snapshot behaviour. + return getFallbackReaderFunction().read(split); + } + HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split; + + HoodieCDCSupplementalLoggingMode mode = OptionsResolver.getCDCSupplementalLoggingMode(conf); + HoodieTableMetaClient client = getMetaClient(); + HoodieWriteConfig wConfig = getWriteConfig(); + + ImageManager imageManager = new ImageManager(rowType, wConfig, this::getFileSliceIterator); + + Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc = + cdcFileSplit -> createRecordIteratorSafe( + cdcSplit.getTablePath(), + cdcSplit.getMaxCompactionMemoryInBytes(), + cdcFileSplit, + mode, + imageManager, + client); + + currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager, recordIteratorFunc); + BatchRecords<RowData> records = BatchRecords.forRecords( + split.splitId(), currentIterator, split.getFileOffset(), split.getConsumed()); + records.seek(split.getConsumed()); + return records; + } + + @Override + public void close() throws Exception { + if (currentIterator != null) { + currentIterator.close(); + } + if (fallbackReaderFunction != null) { + fallbackReaderFunction.close(); + } + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private HoodieSplitReaderFunction getFallbackReaderFunction() { Review Comment: The non-CDC fallback constructs `HoodieSplitReaderFunction` with an empty predicate list. `CdcInputFormat`'s non-CDC path preserves configured predicates, so this fallback is not on par. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
