prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r969357231


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,
+                                         GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || 
(recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }

Review Comment:
   If inheritance hierarchy is complicated, lets us not do that. Instead can we 
create a HoodieCDCLogger as a helper class and move all code related to writing 
CDC data  into this abstraction. You can just initialize a HoodieCDCLogger from 
HoodieMergeHandle?
   
   Would that work?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +241,43 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Option<Pair<String, List<String>>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   Can you a leave a TODO in code for the refactor and link jira?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to