cshuo commented on code in PR #18361: URL: https://github.com/apache/hudi/pull/18361#discussion_r2986149895
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java: ########## @@ -0,0 +1,1065 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader.function; + +import com.google.common.collect.Lists; +import org.apache.hudi.client.model.HoodieFlinkRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.DeleteContext; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.source.ExpressionPredicates; +import org.apache.hudi.source.reader.BatchRecords; +import org.apache.hudi.source.reader.HoodieRecordWithPosition; +import org.apache.hudi.source.split.HoodieCdcSourceSplit; +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.format.FormatUtils; +import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.table.format.FlinkReaderContextFactory; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; +import org.apache.hudi.table.format.cdc.CdcInputFormat; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.table.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroToRowDataConverters; +import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.HoodieSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.RowDataProjection; + +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.hadoop.fs.Path; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; +import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; + +/** + * CDC reader function for source V2. Reads CDC splits ({@link HoodieCdcSourceSplit}) and + * emits change-log {@link RowData} records tagged with the appropriate {@link RowKind}. + * + * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted for the + * {@link SplitReaderFunction} contract. + */ +@Slf4j +public class HoodieCdcSplitReaderFunction implements SplitReaderFunction<RowData> { + + private final org.apache.flink.configuration.Configuration conf; + private final InternalSchemaManager internalSchemaManager; + private final List<DataType> fieldTypes; + private final MergeOnReadTableState tableState; + private final boolean emitDelete; + private final List<ExpressionPredicates.Predicate> predicates; + private transient HoodieTableMetaClient metaClient; + private transient HoodieWriteConfig writeConfig; + private transient org.apache.hadoop.conf.Configuration hadoopConf; + private transient ClosableIterator<RowData> currentIterator; + // Fallback reader for non-CDC splits (e.g. snapshot reads when read.start-commit='earliest') + private transient HoodieSplitReaderFunction fallbackReaderFunction; + + /** + * Creates a CDC split reader function. + * + * @param conf Flink configuration + * @param tableState Merge on Read table state + * @param internalSchemaManager Schema-evolution manager + * @param fieldTypes DataType list for all table fields (used for parquet reading) + * @param predicates Predicates for push down + * @param emitDelete Whether to emit delete + */ + public HoodieCdcSplitReaderFunction( + org.apache.flink.configuration.Configuration conf, + MergeOnReadTableState tableState, + InternalSchemaManager internalSchemaManager, + List<DataType> fieldTypes, + List<ExpressionPredicates.Predicate> predicates, + boolean emitDelete) { + this.conf = conf; + this.tableState = tableState; + this.internalSchemaManager = internalSchemaManager; + this.fieldTypes = fieldTypes; + this.predicates = predicates; + this.emitDelete = emitDelete; + } + + @Override + public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> read(HoodieSourceSplit split) { + if (!(split instanceof HoodieCdcSourceSplit)) { + // Non-CDC splits arrive when reading from 'earliest' with no prior CDC history + // (i.e. instantRange is empty → snapshot path). Fall back to the standard MOR reader + // which emits all records as INSERT rows, matching the expected snapshot behaviour. + return getFallbackReaderFunction().read(split); + } + HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split; + + HoodieCDCSupplementalLoggingMode mode = OptionsResolver.getCDCSupplementalLoggingMode(conf); + HoodieTableMetaClient client = getMetaClient(); + HoodieWriteConfig wConfig = getWriteConfig(); + + ImageManager imageManager = new ImageManager(tableState.getRowType(), wConfig, this::getFileSliceIterator); + + Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc = + cdcFileSplit -> createRecordIteratorSafe( + cdcSplit.getTablePath(), + cdcSplit.getMaxCompactionMemoryInBytes(), + cdcFileSplit, + mode, + imageManager, + client); + + currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager, recordIteratorFunc); + BatchRecords<RowData> records = BatchRecords.forRecords( + split.splitId(), currentIterator, split.getFileOffset(), split.getConsumed()); + records.seek(split.getConsumed()); + return records; + } + + @Override + public void close() throws Exception { + if (currentIterator != null) { + currentIterator.close(); + } + if (fallbackReaderFunction != null) { + fallbackReaderFunction.close(); + } + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private HoodieSplitReaderFunction getFallbackReaderFunction() { + if (fallbackReaderFunction == null) { + fallbackReaderFunction = new HoodieSplitReaderFunction( + conf, + HoodieSchema.parse(tableState.getTableSchema()), + HoodieSchema.parse(tableState.getRequiredSchema()), + internalSchemaManager, + conf.get(FlinkOptions.MERGE_TYPE), + predicates, + emitDelete); + } + return fallbackReaderFunction; + } + + private ClosableIterator<RowData> createRecordIteratorSafe( + String tablePath, + long maxCompactionMemoryInBytes, + HoodieCDCFileSplit fileSplit, + HoodieCDCSupplementalLoggingMode mode, + ImageManager imageManager, + HoodieTableMetaClient client) { + try { + return createRecordIterator(tablePath, maxCompactionMemoryInBytes, fileSplit, mode, imageManager, client); + } catch (IOException e) { + throw new HoodieException("Failed to create CDC record iterator for split: " + fileSplit, e); + } + } + + private ClosableIterator<RowData> createRecordIterator( + String tablePath, + long maxCompactionMemoryInBytes, + HoodieCDCFileSplit fileSplit, + HoodieCDCSupplementalLoggingMode mode, + ImageManager imageManager, + HoodieTableMetaClient client) throws IOException { + + final HoodieSchema tableSchema = HoodieSchema.parse(tableState.getTableSchema()); + final HoodieSchema requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema()); + switch (fileSplit.getCdcInferCase()) { + case BASE_FILE_INSERT: { + ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1, + "CDC file path should exist and be singleton for BASE_FILE_INSERT"); + String path = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString(); + return new AddBaseFileIterator(getBaseFileIterator(path)); + } + case BASE_FILE_DELETE: { + ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(), + "Before file slice should exist for BASE_FILE_DELETE"); + FileSlice fileSlice = fileSplit.getBeforeFileSlice().get(); + MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes); + return new RemoveBaseFileIterator(tableState.getRequiredRowType(), tableState.getRequiredPositions(), getFileSliceIterator(inputSplit)); + } + case AS_IS: { + HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(tableSchema); + HoodieSchema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema); + switch (mode) { + case DATA_BEFORE_AFTER: + return new BeforeAfterImageIterator( + getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(), cdcSchema, fileSplit); + case DATA_BEFORE: + return new BeforeImageIterator( + conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(), + maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager); + case OP_KEY_ONLY: + return new RecordKeyImageIterator( + conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(), + maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager); + default: + throw new AssertionError("Unexpected CDC supplemental logging mode: " + mode); + } + } + case LOG_FILE: { + ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1, + "CDC file path should exist and be singleton for LOG_FILE"); + String logFilePath = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString(); + MergeOnReadInputSplit split = CdcInputFormat.singleLogFile2Split(tablePath, logFilePath, maxCompactionMemoryInBytes); + ClosableIterator<HoodieRecord<RowData>> recordIterator = getFileSliceHoodieRecordIterator(split); + return new DataLogFileIterator( + maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema, tableState.getRequiredRowType(), tableState.getRequiredPositions(), + recordIterator, client, getWriteConfig()); + } + case REPLACE_COMMIT: { + return new ReplaceCommitIterator( + conf, tablePath, tableState.getRequiredRowType(), tableState.getRequiredPositions(), maxCompactionMemoryInBytes, + fileSplit, this::getFileSliceIterator); + } + default: + throw new AssertionError("Unexpected CDC file split infer case: " + fileSplit.getCdcInferCase()); + } + } + + /** Reads the full-schema before/after image for a file slice (emitDelete=false). */ + private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit split) { + FileSlice fileSlice = buildFileSlice(split); + final HoodieSchema tableSchema = HoodieSchema.parse(tableState.getTableSchema()); + try { + HoodieFileGroupReader<RowData> reader = FormatUtils.createFileGroupReader( + getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice, + tableSchema, tableSchema, split.getLatestCommit(), + FlinkOptions.REALTIME_PAYLOAD_COMBINE, false, + predicates, split.getInstantRange()); + return reader.getClosableIterator(); + } catch (IOException e) { + throw new HoodieIOException("Failed to create file slice iterator for split: " + split, e); + } + } + + /** Reads a single log file and returns a typed {@link HoodieRecord} iterator (for LOG_FILE CDC inference). */ + private ClosableIterator<HoodieRecord<RowData>> getFileSliceHoodieRecordIterator(MergeOnReadInputSplit split) { + FileSlice fileSlice = buildFileSlice(split); + final HoodieSchema tableSchema = HoodieSchema.parse(tableState.getTableSchema()); + try { + HoodieFileGroupReader<RowData> reader = FormatUtils.createFileGroupReader( + getMetaClient(), getWriteConfig(), internalSchemaManager, fileSlice, + tableSchema, tableSchema, split.getLatestCommit(), + FlinkOptions.REALTIME_PAYLOAD_COMBINE, true, + Collections.emptyList(), split.getInstantRange()); Review Comment: `Collections.emptyList()` -> `predicates` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
