hudi-agent commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3237167433
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -135,19 +139,59 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
if (previousRecord == null) {
// special case for payloads when there is no previous record
HoodieSchema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
- GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
- HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
- try {
- if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
- return null;
- } else {
- HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
- // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
- hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
- .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ GenericRecord originalAvro = mergedRecord.getOriginalAvroRecord();
Review Comment:
🤖 nit: `handleNonDeletes` has grown to ~50 lines with three levels of
nesting and two distinct branches (originalAvro fast-path vs. payload
evaluation). Could you extract the two branches into helpers (e.g.
`applyOriginalAvroRecord(...)` and `evaluatePayloadAndRewrite(...)`)? It would
make the high-level flow read at a glance and keep the long inline comments
next to the code they document.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -135,19 +139,59 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
if (previousRecord == null) {
// special case for payloads when there is no previous record
HoodieSchema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
- GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
- HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
- try {
- if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
- return null;
- } else {
- HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
- // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
- hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
- .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ GenericRecord originalAvro = mergedRecord.getOriginalAvroRecord();
+ Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+ // When the merged record carries an originalAvroRecord (populated by
extractDataFromRecord
+ // for ExpressionPayload in the COW write path via ExtractedData), the
record is already in
+ // write-schema format with correctly evaluated expressions. Convert
directly and skip the
+ // payload path.
+ //
+ // NOTE: this branch bypasses shouldIgnore. That is safe today because
the only payload that
+ // populates originalAvroRecord is ExpressionPayload, which never
returns shouldIgnore=true.
+ // If a future payload starts producing an originalAvroRecord, it must
add a shouldIgnore
+ // check here.
+ if (originalAvro != null) {
+ // After replaceRecord(), mergedRecord.getSchemaId() still
references the original schema.
+ // This is safe because the record is emitted immediately via
super.handleNonDeletes() below,
+ // which calls seal() and produces the output row — the record is
not spilled to disk
+ // (via toBinary()) between replaceRecord and emit in this
single-record path. If this
+ // assumption changes, the schemaId must be updated after
replaceRecord.
Review Comment:
🤖 When this fast-path fires,
`replaceRecord(convertAvroRecord(originalAvro))` produces an InternalRow shaped
by `originalAvro.getSchema()` (the payload's writerSchema, which by definition
differs from `recordSchema` here — otherwise `originalAvro` would be null), and
`replaceRecord` then clears `originalAvroRecord` to null. After
`super.handleNonDeletes` returns, `CallbackProcessor` calls
`callback.onInsert(recordKey, mergedRecord)` on enabled callbacks (e.g.
`CDCCallback`, `SecondaryIndexCallback`). `CDCCallback.convertOutput` then
calls `recordContext.convertToAvroRecord(record,
getSchemaFromBufferRecord(record))` — which sees `originalAvroRecord == null`
and serializes the new InternalRow using `schemaId`'s schema. Could this
produce a wrong-arity CDC record when CDC is enabled with a MERGE INTO insert?
The old code rewrote through `readerSchema`, which side-stepped the issue. Have
you confirmed this combo is covered by a test, or that it's safe by
construction?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]