Savonitar commented on code in PR #174: URL: https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2848960874
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java: ########## Review Comment: Thanks for adding the pushdown options to `optionalOptions()`. However, it looks like format-level projection pushdown doesn't work for **upsert-kafka**. Regardless of the configured pushdown level, all value fields are always fully deserialized, and projection happens only afterwards via the Projector. The root cause is `DecodingFormatWrapper` -> it only implements DecodingFormat, not ProjectableDecodingFormat. When Decoder.create() checks `decodingFormat instanceof ProjectableDecodingFormat`, the wrapper always fails the check, so the projectInsideDeserializer path is never taken for **upsert-kafka**. I implemented a **failing** test to verify this hypothesis and to provide a basis for the fix: https://github.com/Savonitar/flink-connector-kafka/commit/8e2c1aed The test writes data with a breaking schema change on a non-projected value field (name changes from INT to STRING), then reads selecting only user_id and payload. With working format-level pushdown, name would be skipped during deserialization and the query would succeed. Instead, it fails with JsonParseException because all fields are deserialized. I took into account the style of existing tests in UpsertKafkaTableITCase and your KafkaTableITCase#testProjectionPushdownWithJsonFormatAndBreakingSchemaChange to align. I kept the commit in my fork rather than pushing to your branch -> feel free to cherry-pick it as a regression test, or reimplement it entirely if you prefer. The test should pass when the fix is implemented. A possible fix: make DecodingFormatWrapper also implement ProjectableDecodingFormat and delegate createRuntimeDecoder(context, producedDataType, projections) to the inner format when it's a ProjectableDecodingFormat. Could you please take a look and implement the fix if you agree with the analysis? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
