yihua commented on code in PR #13726:
URL: https://github.com/apache/hudi/pull/13726#discussion_r2283611647


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
           writeSchemaWithMetaFields, config, recordMerger, keyGenerator, 
incomingRecordContext, existingRecordContext, orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
-      HoodieRecord incomingPrepended = incoming
-          .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
       BufferedRecord<R> existingBufferedRecord = 
BufferedRecords.fromHoodieRecord(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+      
existingBufferedRecord.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
 writeSchema));

Review Comment:
   Now that `existingRecordContext.projectRecord(writeSchemaWithMetaFields, 
writeSchema)` is common across records, could this operator be generated once 
per iterator in `mergeForPartitionUpdatesIfNeeded` instead of being created per 
record here?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
           writeSchemaWithMetaFields, config, recordMerger, keyGenerator, 
incomingRecordContext, existingRecordContext, orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
-      HoodieRecord incomingPrepended = incoming
-          .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
       BufferedRecord<R> existingBufferedRecord = 
BufferedRecords.fromHoodieRecord(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+      
existingBufferedRecord.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
 writeSchema));
       BufferedRecord<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
 
       if (mergeResult.isDelete()) {
         // the record was deleted
         return Option.empty();
       }
       String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
-      HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
-      // the merged record needs to be converted back to the original payload
-      return 
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
 config.getProps(), Option.empty(),
-          config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema)));
+      if (config.isFileGroupReaderBasedMergedHandle() && 
HoodieRecordUtils.isPayloadClassDeprecated(ConfigUtils.getPayloadClass(config.getProps())))
 {
+        
mergeResult.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
 writeSchema));

Review Comment:
   Similar here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
           writeSchemaWithMetaFields, config, recordMerger, keyGenerator, 
incomingRecordContext, existingRecordContext, orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
-      HoodieRecord incomingPrepended = incoming
-          .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);

Review Comment:
   To confirm, is my understanding of the expected behavior correct:
   - Before: incoming record (no meta fields) is projected by adding meta 
fields, existing record already has meta fields, the two records are merged 
with the result returned (with intermediate processing for the correct record 
representation)
   - After: incoming record (no meta fields) is used as is, existing record 
(containing meta fields) is projected to remove meta fields, the two records 
are merged with result not containing meta fields. Such a result record is 
projected again before returning.
   
   If so, should `incomingBufferedRecord = BufferedRecords.fromHoodieRecord` 
use `writeSchema` instead of `writeSchemaWithMetaFields`?
   
   Also, what's the overhead of multiple projections?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to