wombatu-kun commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3179817185
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -135,19 +139,59 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
if (previousRecord == null) {
// special case for payloads when there is no previous record
HoodieSchema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
- GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
- HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
- try {
- if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
- return null;
- } else {
- HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
- // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
- hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
- .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ GenericRecord originalAvro = mergedRecord.getOriginalAvroRecord();
+ Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+ // When the merged record carries an originalAvroRecord (populated by
extractDataFromRecord
+ // for ExpressionPayload in the COW write path via ExtractedData), the
record is already in
+ // write-schema format with correctly evaluated expressions. Convert
directly and skip the
+ // payload path.
+ //
+ // NOTE: this branch bypasses shouldIgnore. That is safe today because
the only payload that
+ // populates originalAvroRecord is ExpressionPayload, which never
returns shouldIgnore=true.
+ // If a future payload starts producing an originalAvroRecord, it must
add a shouldIgnore
+ // check here.
+ if (originalAvro != null) {
+ // After replaceRecord(), mergedRecord.getSchemaId() still
references the original schema.
+ // This is safe because the record is emitted immediately via
super.handleNonDeletes() below,
+ // which calls seal() and produces the output row — the record is
not spilled to disk
+ // (via toBinary()) between replaceRecord and emit in this
single-record path. If this
+ // assumption changes, the schemaId must be updated after
replaceRecord.
Review Comment:
Good catch — the chain you describe is real:
`CallbackProcessor.processUpdate` calls `callback.onInsert` after the delegate
returns, and `CDCCallback.convertOutput` resolves the schema via
`getSchemaFromBufferRecord(record)` → the stale `schemaId`, so a data-only
`InternalRow` would be serialized against the writeSchema-with-meta-fields and
arity-mismatch.
Confirming on coverage: I don't have a test that exercises the SQL `MERGE
INTO` + `ExpressionPayload` + COW + `hoodie.table.cdc.enabled=true` combination
in this PR's matrix — Lance is COW-only and has no CDC coverage yet.
The NOTE block in this method (lines 155-159) deliberately scopes the safety
claim to "the record is not spilled to disk between replaceRecord and emit in
this single-record path" — the CDC callback fires *after* `processUpdate`
returns, so it's exactly the "if this assumption changes" case. Since this PR's
scope is Lance + Spark-SQL parity, I'd prefer to track this as a follow-up:
file a separate issue to (a) update `schemaId` after `replaceRecord` to the
data-only schema id and (b) add a CDC + MERGE INTO regression test. Happy to
open that issue and link it here if you'd like.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]