yihua commented on code in PR #13726:
URL: https://github.com/apache/hudi/pull/13726#discussion_r2283611647
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
writeSchemaWithMetaFields, config, recordMerger, keyGenerator,
incomingRecordContext, existingRecordContext, orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
- HoodieRecord incomingPrepended = incoming
- .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
BufferedRecord<R> existingBufferedRecord =
BufferedRecords.fromHoodieRecord(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+
existingBufferedRecord.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
writeSchema));
Review Comment:
Now that `existingRecordContext.projectRecord(writeSchemaWithMetaFields,
writeSchema)` is common across records, could this operator be generated once
per iterator in `mergeForPartitionUpdatesIfNeeded` instead of being created per
record here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
writeSchemaWithMetaFields, config, recordMerger, keyGenerator,
incomingRecordContext, existingRecordContext, orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
- HoodieRecord incomingPrepended = incoming
- .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
BufferedRecord<R> existingBufferedRecord =
BufferedRecords.fromHoodieRecord(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+
existingBufferedRecord.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
writeSchema));
BufferedRecord<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
if (mergeResult.isDelete()) {
// the record was deleted
return Option.empty();
}
String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
- HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
- // the merged record needs to be converted back to the original payload
- return
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema)));
+ if (config.isFileGroupReaderBasedMergedHandle() &&
HoodieRecordUtils.isPayloadClassDeprecated(ConfigUtils.getPayloadClass(config.getProps())))
{
+
mergeResult.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields,
writeSchema));
Review Comment:
Similar here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -477,21 +477,26 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
writeSchemaWithMetaFields, config, recordMerger, keyGenerator,
incomingRecordContext, existingRecordContext, orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
- HoodieRecord incomingPrepended = incoming
- .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incomingPrepended, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecords.fromHoodieRecord(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
Review Comment:
To confirm, is my understanding of the expected behavior correct:
- Before: incoming record (no meta fields) is projected by adding meta
fields, existing record already has meta fields, the two records are merged
with the result returned (with intermediate processing for the correct record
representation)
- After: incoming record (no meta fields) is used as is, existing record
(containing meta fields) is projected to remove meta fields, the two records
are merged with result not containing meta fields. Such a result record is
projected again before returning.
If so, should `incomingBufferedRecord = BufferedRecords.fromHoodieRecord`
use `writeSchema` instead of `writeSchemaWithMetaFields`?
Also, what's the overhead of multiple projections?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]