xushiyan commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r965411452
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java: ########## @@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException { } } + protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String recordKey, String partitionPath, + GenericRecord oldRecord, GenericRecord newRecord) { + GenericData.Record record; + if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) { + record = CDCUtils.cdcRecord(operation.getValue(), instantTime, Review Comment: can we prefix classes with `Hoodie`? like `HoodieCDCUtils` , which is the convention in the codebase ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java: ########## @@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException { } } + protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String recordKey, String partitionPath, Review Comment: better name for this kind of method would be starting with `make` or `create`, easier to understand ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileSplit.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * This contains all the information that retrieve the change data at a single file group and + * at a single commit. + * + * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a current version of + * the base file in the group, and [[beforeFileSlice]] is None. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the previous version of the base file in the group. + * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a log file with cdc blocks. + * when enable the supplemental logging, both [[beforeFileSlice]] and [[afterFileSlice]] are None, + * otherwise these two are the previous and current version of the base file. + * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a normal log file and + * [[beforeFileSlice]] is the previous version of the file slice. + * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] is null, + * [[beforeFileSlice]] is the current version of the file slice. + */ +public class CDCFileSplit implements Serializable { Review Comment: HoodieCDCFileSplit ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FSDataInputStream; + +import org.apache.hudi.common.util.Option; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class HoodieCDCDataBlock extends HoodieAvroDataBlock { + + public HoodieCDCDataBlock( + FSDataInputStream inputStream, + Option<byte[]> content, + boolean readBlockLazily, + HoodieLogBlockContentLocation logBlockContentLocation, + Schema readerSchema, + Map<HeaderMetadataType, String> header, + String keyField) { + super(inputStream, content, readBlockLazily, logBlockContentLocation, + Option.of(readerSchema), header, new HashMap<>(), keyField, null); + } + + public HoodieCDCDataBlock(@Nonnull List<IndexedRecord> records, + @Nonnull Map<HeaderMetadataType, String> header, + @Nonnull String keyField) { Review Comment: params by convention should be non null so i would remove the Nonnull ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java: ########## @@ -236,6 +241,43 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } + /** + * parse the bytes of deltacommit, and get the base file and the log files belonging to this + * provided file group. + */ + public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit( Review Comment: when evolving commit metadata schema (except the extra fields), we have to illustrate it in RFC for proper review. breaking at storage level is a show-stopper ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +public class CDCExtractor { Review Comment: similarly we should call it `HoodieCDCExtractor` ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java: ########## @@ -273,6 +279,33 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier); } + protected HoodieLogFormat.Writer createLogWriter( Review Comment: this meant for CDC log writing right? can we move it to CDC-dedicated classes instead of mixing in write handle. people may be confused about when/what scenario to use this. ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/CDCLogRecordReader.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.cdc.CDCUtils; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.IOException; + +public class CDCLogRecordReader implements ClosableIterator<IndexedRecord> { Review Comment: HoodieCDCLogRecordReader ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java: ########## @@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException { } } + protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String recordKey, String partitionPath, + GenericRecord oldRecord, GenericRecord newRecord) { + GenericData.Record record; + if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) { + record = CDCUtils.cdcRecord(operation.getValue(), instantTime, + oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath)); + } else if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE)) { + record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord); + } else { + record = CDCUtils.cdcRecord(operation.getValue(), recordKey); + } + return new SerializableRecord(record); + } + + protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) { + if (record != null && config.populateMetaFields()) { + GenericRecord rewriteRecord = rewriteRecord(record); + String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get()); + HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId); + HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, partitionPath, newFilePath.getName()); + return rewriteRecord; + } + return record; + } + + protected Option<AppendResult> writeCDCData() { + if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the data are new-coming, + return Option.empty(); + } Review Comment: can we encapsulate CDC writer logic within dedicated CDC writer as much as possible? the concern here is we don't want to spread the condition checkings across different classes, which would be hard to maintain, in case any of these assumption changed in future. Grouping all the CDC logic together really helps with evolving/maintenance in future ########## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +public class CDCExtractor { + + private final HoodieTableMetaClient metaClient; + + private final Path basePath; + + private final FileSystem fs; + + private final String supplementalLoggingMode; + + private final String startInstant; + + private final String endInstant; + + // TODO: this will be used when support the cdc query type of 'read_optimized'. + private final String cdcQueryType; + + private Map<HoodieInstant, HoodieCommitMetadata> commits; + + private HoodieTableFileSystemView fsView; + + public CDCExtractor( + HoodieTableMetaClient metaClient, + String startInstant, + String endInstant, + String cdcqueryType) { + this.metaClient = metaClient; + this.basePath = metaClient.getBasePathV2(); + this.fs = metaClient.getFs().getFileSystem(); + this.supplementalLoggingMode = metaClient.getTableConfig().cdcSupplementalLoggingMode(); + this.startInstant = startInstant; + this.endInstant = endInstant; + if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType() + && cdcqueryType.equals("read_optimized")) { + throw new HoodieNotSupportedException("The 'read_optimized' cdc query type hasn't been supported for now."); + } + this.cdcQueryType = cdcqueryType; + init(); + } + + private void init() { + initInstantAndCommitMetadatas(); + initFSView(); + } + + /** + * At the granularity of a file group, trace the mapping between + * each commit/instant and changes to this file group. + */ + public Map<HoodieFileGroupId, List<Pair<HoodieInstant, CDCFileSplit>>> extractor() { Review Comment: extractor() does not sound proper name for this.. extractCDCFileSplits()? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java: ########## @@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String HoodieTable<T, I, K, O> hoodieTable, Option<Schema> overriddenSchema, TaskContextSupplier taskContextSupplier) { super(config, Option.of(instantTime), hoodieTable); + this.keyField = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD + : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); Review Comment: this could be multiple keys that user configured. we might need a `keyFields` as a list ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java: ########## @@ -294,6 +295,8 @@ public interface HoodieTimeline extends Serializable { */ Stream<HoodieInstant> getInstants(); + List<HoodieInstant> getInstantsAsList(); Review Comment: ditto ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala: ########## @@ -169,310 +141,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private def getConfig: Configuration = { val conf = confBroadcast.value.value - HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized { + CONFIG_INSTANTIATION_LOCK.synchronized { new Configuration(conf) } } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in - * Delta Log files (represented as [[InternalRow]]s) - */ - private class LogFileIterator(split: HoodieMergeOnReadFileSplit, - config: Configuration) - extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport { - - protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema - - protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - - protected var recordToLoad: InternalRow = _ - - private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema) - - private var logScanner = { - val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) - HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, - maxCompactionMemoryInBytes, config, internalSchema) - } - - private val logRecords = logScanner.getRecords.asScala - - // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's - // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier - protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = - logRecords.iterator.map { - case (_, record) => - toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) - .map(_.asInstanceOf[GenericRecord]) - } - - protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = - logRecords.remove(key) - - override def hasNext: Boolean = hasNextInternal - - // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure - // that recursion is unfolded into a loop to avoid stack overflows while - // handling records - @tailrec private def hasNextInternal: Boolean = { - logRecordsIterator.hasNext && { - val avroRecordOpt = logRecordsIterator.next() - if (avroRecordOpt.isEmpty) { - // Record has been deleted, skipping - this.hasNextInternal - } else { - val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) - recordToLoad = deserialize(projectedAvroRecord) - true - } - } - } - - override final def next(): InternalRow = recordToLoad - - override def close(): Unit = - if (logScanner != null) { - try { - logScanner.close() - } finally { - logScanner = null - } - } - } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in - * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not - * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) - */ - private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, - baseFileReader: BaseFileReader, - config: Configuration) - extends LogFileIterator(split, config) { - - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) - - private val baseFileIterator = baseFileReader(split.dataFile.get) - - override def hasNext: Boolean = { - if (baseFileIterator.hasNext) { - // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next()) - true - } else { - super[LogFileIterator].hasNext - } - } - } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in - * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these - * streams - */ - private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileReader: BaseFileReader, - config: Configuration) - extends LogFileIterator(split, config) { - - // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either - // - Full table's schema - // - Projected schema - // As such, no particular schema could be assumed, and therefore we rely on the caller - // to correspondingly set the scheme of the expected output of base-file reader - private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") - - private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) - - private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) - - private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) - - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) - - private val baseFileIterator = baseFileReader(split.dataFile.get) - - override def hasNext: Boolean = hasNextInternal - - // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure - // that recursion is unfolded into a loop to avoid stack overflows while - // handling records - @tailrec private def hasNextInternal: Boolean = { - if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - val curKey = curRow.getString(recordKeyOrdinal) - val updatedRecordOpt = removeLogRecord(curKey) - if (updatedRecordOpt.isEmpty) { - // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(curRow) - true - } else { - val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) - if (mergedAvroRecordOpt.isEmpty) { - // Record has been deleted, skipping - this.hasNextInternal - } else { - val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], - requiredAvroSchema, reusableRecordBuilder) - recordToLoad = deserialize(projectedAvroRecord) - true - } - } - } else { - super[LogFileIterator].hasNext - } - } - - private def serialize(curRowRecord: InternalRow): GenericRecord = - serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] - - private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { - // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API - // on the record from the Delta Log - toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) - } - } -} - -private object HoodieMergeOnReadRDD { - - val CONFIG_INSTANTIATION_LOCK = new Object() - - def scanLog(logFiles: List[HoodieLogFile], - partitionPath: Path, - logSchema: Schema, - tableState: HoodieTableState, - maxCompactionMemoryInBytes: Long, - hadoopConf: Configuration, internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = { - val tablePath = tableState.tablePath - val fs = FSUtils.getFs(tablePath, hadoopConf) - - if (HoodieTableMetadata.isMetadataTable(tablePath)) { - val metadataConfig = HoodieMetadataConfig.newBuilder() - .fromProperties(tableState.metadataConfig.getProps).enable(true).build() - val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath) - val metadataTable = new HoodieBackedTableMetadata( - new HoodieLocalEngineContext(hadoopConf), metadataConfig, - dataTableBasePath, - hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) - - // We have to force full-scan for the MT log record reader, to make sure - // we can iterate over all of the partitions, since by default some of the partitions (Column Stats, - // Bloom Filter) are in "point-lookup" mode - val forceFullScan = true - - // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level - // of indirection among MT partitions) - val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) - metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan))) - .getLeft - } else { - val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(tablePath) - .withLogFilePaths(logFiles.map(logFile => logFile.getPath.toString).asJava) - .withReaderSchema(logSchema) - .withLatestInstantTime(tableState.latestCommitTimestamp) - .withReadBlocksLazily( - Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) - .getOrElse(false)) - .withReverseReader(false) - .withInternalSchema(internalSchema) - .withBufferSize( - hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes) - .withSpillableMapBasePath( - hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, - HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) - .withDiskMapType( - hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key, - HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)) - .withBitCaskDiskMapCompressionEnabled( - hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), - HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) - if (logFiles.nonEmpty) { - logRecordScannerBuilder.withPartition( - getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent)) - } - - logRecordScannerBuilder.build() - } - } - - private def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = { - val fields = projectedSchema.getFields.asScala - fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name()))) - reusableRecordBuilder.build() - } - - private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { Review Comment: can you clarify this big chunk of code removal? a result of refactoring? if that's the case, let's keep change minimal for CDC-related. try to achieve the same functionalities without moving code around. note that there can be other refactoring work going on around this and conflicts can be tricky ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java: ########## @@ -93,13 +96,19 @@ public void write(GenericRecord oldRecord) { throw new HoodieUpsertException("Insert/Update not in sorted order"); } try { + Option<IndexedRecord> insertRecord; if (useWriterSchemaForCompaction) { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()); } else { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()); } + writeRecord(hoodieRecord, insertRecord); Review Comment: yea saw that ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java: ########## @@ -236,6 +241,43 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } + /** + * parse the bytes of deltacommit, and get the base file and the log files belonging to this + * provided file group. + */ + public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit( Review Comment: i'm ok with moving forward with a proper refactoring later, if you can annotate this method with a jira and take it up soon after first version. ########## hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * In some cases like putting the [[GenericData.Record]] into [[ExternalSpillableMap]], + * objects is asked to extend [[Serializable]]. + * + * This class wraps [[GenericData.Record]]. + */ +public class SerializableRecord implements Serializable { Review Comment: can we make use of `HoodieRecord` which is the current common way to go with spillable map ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java: ########## @@ -354,6 +354,11 @@ public Stream<HoodieInstant> getInstants() { return instants.stream(); } + @Override + public List<HoodieInstant> getInstantsAsList() { + return instants; Review Comment: should have a clean up ticket to call this `getInstants` and the other `getInstantsAsStream`. Public API names should not cause confusion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org