xushiyan commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r965411452


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,
+                                         GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,

Review Comment:
   can we prefix classes with `Hoodie`? like `HoodieCDCUtils` , which is the 
convention in the codebase



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,

Review Comment:
   better name for this kind of method would be starting with `make` or 
`create`, easier to understand



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileSplit.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.Option;
+
+import java.io.Serializable;
+
+/**
+ * This contains all the information that retrieve the change data at a single 
file group and
+ * at a single commit.
+ *
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a 
current version of
+ *   the base file in the group, and [[beforeFileSlice]] is None.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is 
null,
+ *   [[beforeFileSlice]] is the previous version of the base file in the group.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a 
log file with cdc blocks.
+ *   when enable the supplemental logging, both [[beforeFileSlice]] and 
[[afterFileSlice]] are None,
+ *   otherwise these two are the previous and current version of the base file.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a 
normal log file and
+ *   [[beforeFileSlice]] is the previous version of the file slice.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] 
is null,
+ *   [[beforeFileSlice]] is the current version of the file slice.
+ */
+public class CDCFileSplit implements Serializable {

Review Comment:
   HoodieCDCFileSplit



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hudi.common.util.Option;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
+
+  public HoodieCDCDataBlock(
+      FSDataInputStream inputStream,
+      Option<byte[]> content,
+      boolean readBlockLazily,
+      HoodieLogBlockContentLocation logBlockContentLocation,
+      Schema readerSchema,
+      Map<HeaderMetadataType, String> header,
+      String keyField) {
+    super(inputStream, content, readBlockLazily, logBlockContentLocation,
+        Option.of(readerSchema), header, new HashMap<>(), keyField, null);
+  }
+
+  public HoodieCDCDataBlock(@Nonnull List<IndexedRecord> records,
+                            @Nonnull Map<HeaderMetadataType, String> header,
+                            @Nonnull String keyField) {

Review Comment:
   params by convention should be non null so i would remove the Nonnull 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +241,43 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Option<Pair<String, List<String>>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   when evolving commit metadata schema (except the extra fields), we have to 
illustrate it in RFC for proper review. breaking at storage level is a 
show-stopper 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class CDCExtractor {

Review Comment:
   similarly we should call it `HoodieCDCExtractor`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -273,6 +279,33 @@ protected HoodieFileWriter createNewFileWriter(String 
instantTime, Path path, Ho
     return HoodieFileWriterFactory.getFileWriter(instantTime, path, 
hoodieTable, config, schema, taskContextSupplier);
   }
 
+  protected HoodieLogFormat.Writer createLogWriter(

Review Comment:
   this meant for CDC log writing right? can we move it to CDC-dedicated 
classes instead of mixing in write handle. people may be confused about 
when/what scenario to use this. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/CDCLogRecordReader.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.CDCUtils;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.IOException;
+
+public class CDCLogRecordReader implements ClosableIterator<IndexedRecord> {

Review Comment:
   HoodieCDCLogRecordReader



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,
+                                         GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || 
(recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }

Review Comment:
   can we encapsulate CDC writer logic within dedicated CDC writer as much as 
possible? the concern here is we don't want to spread the condition checkings 
across different classes, which would be hard to maintain, in case any of these 
assumption changed in future. Grouping all the CDC logic together really helps 
with evolving/maintenance in future



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class CDCExtractor {
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final Path basePath;
+
+  private final FileSystem fs;
+
+  private final String supplementalLoggingMode;
+
+  private final String startInstant;
+
+  private final String endInstant;
+
+  // TODO: this will be used when support the cdc query type of 
'read_optimized'.
+  private final String cdcQueryType;
+
+  private Map<HoodieInstant, HoodieCommitMetadata> commits;
+
+  private HoodieTableFileSystemView fsView;
+
+  public CDCExtractor(
+      HoodieTableMetaClient metaClient,
+      String startInstant,
+      String endInstant,
+      String cdcqueryType) {
+    this.metaClient = metaClient;
+    this.basePath = metaClient.getBasePathV2();
+    this.fs = metaClient.getFs().getFileSystem();
+    this.supplementalLoggingMode = 
metaClient.getTableConfig().cdcSupplementalLoggingMode();
+    this.startInstant = startInstant;
+    this.endInstant = endInstant;
+    if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType()
+        && cdcqueryType.equals("read_optimized")) {
+      throw new HoodieNotSupportedException("The 'read_optimized' cdc query 
type hasn't been supported for now.");
+    }
+    this.cdcQueryType = cdcqueryType;
+    init();
+  }
+
+  private void init() {
+    initInstantAndCommitMetadatas();
+    initFSView();
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between
+   * each commit/instant and changes to this file group.
+   */
+  public Map<HoodieFileGroupId, List<Pair<HoodieInstant, CDCFileSplit>>> 
extractor() {

Review Comment:
   extractor() does not sound proper name for this.. extractCDCFileSplits()?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
                               HoodieTable<T, I, K, O> hoodieTable, 
Option<Schema> overriddenSchema,
                               TaskContextSupplier taskContextSupplier) {
     super(config, Option.of(instantTime), hoodieTable);
+    this.keyField = config.populateMetaFields() ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+        : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

Review Comment:
   this could be multiple keys that user configured. we might need a 
`keyFields` as a list 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -294,6 +295,8 @@ public interface HoodieTimeline extends Serializable {
    */
   Stream<HoodieInstant> getInstants();
 
+  List<HoodieInstant> getInstantsAsList();

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -169,310 +141,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
   private def getConfig: Configuration = {
     val conf = confBroadcast.value.value
-    HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
+    CONFIG_INSTANTIATION_LOCK.synchronized {
       new Configuration(conf)
     }
   }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
-   * Delta Log files (represented as [[InternalRow]]s)
-   */
-  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
-                                config: Configuration)
-    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
-
-    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
-
-    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-
-    protected var recordToLoad: InternalRow = _
-
-    private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
-
-    private var logScanner = {
-      val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
-      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
-        maxCompactionMemoryInBytes, config, internalSchema)
-    }
-
-    private val logRecords = logScanner.getRecords.asScala
-
-    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
-    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
-      logRecords.iterator.map {
-        case (_, record) =>
-          toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
-            .map(_.asInstanceOf[GenericRecord])
-      }
-
-    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
-      logRecords.remove(key)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      logRecordsIterator.hasNext && {
-        val avroRecordOpt = logRecordsIterator.next()
-        if (avroRecordOpt.isEmpty) {
-          // Record has been deleted, skipping
-          this.hasNextInternal
-        } else {
-          val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-          recordToLoad = deserialize(projectedAvroRecord)
-          true
-        }
-      }
-    }
-
-    override final def next(): InternalRow = recordToLoad
-
-    override def close(): Unit =
-      if (logScanner != null) {
-        try {
-          logScanner.close()
-        } finally {
-          logScanner = null
-        }
-      }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
-   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
-   */
-  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
-                                  baseFileReader: BaseFileReader,
-                                  config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = {
-      if (baseFileIterator.hasNext) {
-        // No merge is required, simply load current row and project into 
required schema
-        recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
-        true
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
-   * streams
-   */
-  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
-                                          baseFileReader: BaseFileReader,
-                                          config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
-    //        - Full table's schema
-    //        - Projected schema
-    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
-    //       to correspondingly set the scheme of the expected output of 
base-file reader
-    private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
-
-    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
-
-    private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
-
-    private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      if (baseFileIterator.hasNext) {
-        val curRow = baseFileIterator.next()
-        val curKey = curRow.getString(recordKeyOrdinal)
-        val updatedRecordOpt = removeLogRecord(curKey)
-        if (updatedRecordOpt.isEmpty) {
-          // No merge is required, simply load current row and project into 
required schema
-          recordToLoad = requiredSchemaUnsafeProjection(curRow)
-          true
-        } else {
-          val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
-          if (mergedAvroRecordOpt.isEmpty) {
-            // Record has been deleted, skipping
-            this.hasNextInternal
-          } else {
-            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
-              requiredAvroSchema, reusableRecordBuilder)
-            recordToLoad = deserialize(projectedAvroRecord)
-            true
-          }
-        }
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-
-    private def serialize(curRowRecord: InternalRow): GenericRecord =
-      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
-      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
-      //       on the record from the Delta Log
-      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
-    }
-  }
-}
-
-private object HoodieMergeOnReadRDD {
-
-  val CONFIG_INSTANTIATION_LOCK = new Object()
-
-  def scanLog(logFiles: List[HoodieLogFile],
-              partitionPath: Path,
-              logSchema: Schema,
-              tableState: HoodieTableState,
-              maxCompactionMemoryInBytes: Long,
-              hadoopConf: Configuration, internalSchema: InternalSchema = 
InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
-    val tablePath = tableState.tablePath
-    val fs = FSUtils.getFs(tablePath, hadoopConf)
-
-    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
-      val metadataConfig = HoodieMetadataConfig.newBuilder()
-        
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
-      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
-      val metadataTable = new HoodieBackedTableMetadata(
-        new HoodieLocalEngineContext(hadoopConf), metadataConfig,
-        dataTableBasePath,
-        hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-
-      // We have to force full-scan for the MT log record reader, to make sure
-      // we can iterate over all of the partitions, since by default some of 
the partitions (Column Stats,
-      // Bloom Filter) are in "point-lookup" mode
-      val forceFullScan = true
-
-      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
-      //       of indirection among MT partitions)
-      val relativePartitionPath = getRelativePartitionPath(new 
Path(tablePath), partitionPath)
-      metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)))
-        .getLeft
-    } else {
-      val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
-        .withFileSystem(fs)
-        .withBasePath(tablePath)
-        .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
-        .withReaderSchema(logSchema)
-        .withLatestInstantTime(tableState.latestCommitTimestamp)
-        .withReadBlocksLazily(
-          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-            .getOrElse(false))
-        .withReverseReader(false)
-        .withInternalSchema(internalSchema)
-        .withBufferSize(
-          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
-        .withSpillableMapBasePath(
-          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-        .withDiskMapType(
-          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
-            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
-        .withBitCaskDiskMapCompressionEnabled(
-          
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-          
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-      if (logFiles.nonEmpty) {
-        logRecordScannerBuilder.withPartition(
-          getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
-      }
-
-      logRecordScannerBuilder.build()
-    }
-  }
-
-  private def projectAvroUnsafe(record: GenericRecord, projectedSchema: 
Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
-    val fields = projectedSchema.getFields.asScala
-    fields.foreach(field => reusableRecordBuilder.set(field, 
record.get(field.name())))
-    reusableRecordBuilder.build()
-  }
-
-  private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {

Review Comment:
   can you clarify this big chunk of code removal? a result of refactoring? if 
that's the case, let's keep change minimal for CDC-related. try to achieve the 
same functionalities without moving code around. note that there can be other 
refactoring work going on around this and conflicts can be tricky



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java:
##########
@@ -93,13 +96,19 @@ public void write(GenericRecord oldRecord) {
         throw new HoodieUpsertException("Insert/Update not in sorted order");
       }
       try {
+        Option<IndexedRecord> insertRecord;
         if (useWriterSchemaForCompaction) {
-          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps()));
+          insertRecord = 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps());
         } else {
-          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
+          insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, 
config.getProps());
         }
+        writeRecord(hoodieRecord, insertRecord);

Review Comment:
   yea saw that



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +241,43 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Option<Pair<String, List<String>>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   i'm ok with moving forward with a proper refactoring later, if you can 
annotate this method with a jira and take it up soon after first version.



##########
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   can we make use of `HoodieRecord` which is the current common way to go with 
spillable map



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -354,6 +354,11 @@ public Stream<HoodieInstant> getInstants() {
     return instants.stream();
   }
 
+  @Override
+  public List<HoodieInstant> getInstantsAsList() {
+    return instants;

Review Comment:
   should have a clean up ticket to call this `getInstants` and the other 
`getInstantsAsStream`. Public API names should not cause confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to