Savonitar commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2848960874


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java:
##########


Review Comment:
   Thanks for adding the pushdown options to `optionalOptions()`. 
   
   However, it looks like format-level projection pushdown doesn't work for 
**upsert-kafka**. Regardless of the configured pushdown level, all value fields 
are always fully deserialized, and projection happens only afterwards via the 
Projector. 
   
   The root cause is `DecodingFormatWrapper` -> it only implements 
DecodingFormat, not ProjectableDecodingFormat. When Decoder.create() checks 
`decodingFormat instanceof ProjectableDecodingFormat`, the wrapper always fails 
the check, so the projectInsideDeserializer path is never taken for 
**upsert-kafka**. 
   
   I implemented a **failing** test to verify this hypothesis and to provide a 
basis for the fix: 
https://github.com/Savonitar/flink-connector-kafka/commit/8e2c1aed 
   
   The test writes data with a breaking schema change on a non-projected value 
field (name changes from INT to STRING), then reads selecting only user_id and 
payload. With working format-level pushdown, name would be skipped during 
deserialization and the query would succeed. Instead, it fails with 
JsonParseException because all fields are deserialized. 
   
   I took into account the style of existing tests in UpsertKafkaTableITCase 
and your 
KafkaTableITCase#testProjectionPushdownWithJsonFormatAndBreakingSchemaChange to 
align. I kept the commit in my fork rather than pushing to your branch -> feel 
free to cherry-pick it as a regression test, or reimplement it entirely if you 
prefer. The test should pass when the fix is implemented.
   
   A possible fix: make DecodingFormatWrapper also implement 
ProjectableDecodingFormat and delegate createRuntimeDecoder(context, 
producedDataType, projections) to the inner format when it's a 
ProjectableDecodingFormat.
   
   Could you please take a look and implement the fix if you agree with the 
analysis? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to