yihua commented on code in PR #9894:
URL: https://github.com/apache/hudi/pull/9894#discussion_r1372349415


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -119,21 +124,36 @@ protected Option<T> doProcessNextDataRecord(T record,
                                               Map<String, Object> metadata,
                                               Pair<Option<T>, Map<String, 
Object>> existingRecordMetadataPair) throws IOException {
     if (existingRecordMetadataPair != null) {
-      // Merge and store the combined record
-      // Note that the incoming `record` is from an older commit, so it should 
be put as
-      // the `older` in the merge API
-      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
-          readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-          readerSchema,
-          readerContext.constructHoodieRecord(
-              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-          readerSchema,
-          payloadProps).get().getLeft();
-      // If pre-combine returns existing record, no need to update it
-      if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
-        return Option.of(combinedRecord.getData());
+      switch (recordMergeMode) {
+        case OVERWRITE_WITH_LATEST:
+          return Option.empty();
+        case EVENT_TIME_ORDERING:
+          Comparable incomingOrderingValue = readerContext.getOrderingValue(
+              Option.of(record), metadata, readerSchema, payloadProps);
+          Comparable existingOrderingValue = readerContext.getOrderingValue(
+              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+          if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+            return Option.of(record);
+          }
+          return Option.empty();

Review Comment:
   Yes, `existingRecordMetadataPair` should be in the log record mapping.  The 
convention here is that, if `Option.empty()` is returned from this method, the 
log record of the same record key in the mapping should not be updated, to 
avoid the `readerContext.seal`:
   ```
   @Override
     public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordKey) throws IOException {
       Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
       Option<T> mergedRecord = doProcessNextDataRecord(record, metadata, 
existingRecordMetadataPair);
       if (mergedRecord.isPresent()) {
         records.put(recordKey, 
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to