fqshopify commented on code in PR #174: URL: https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2937422436
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java: ########## Review Comment: Thanks for sharing this! And apologies for the delayed response, I've been massively backlogged at work. I thought about this and decided to remove projection pushdown support for the upsert-kafka-connector for now in order to keep this PR slim/focused. It requires a few changes (see below) that would be better to address in a separate PR with proper testing. > A possible fix: make DecodingFormatWrapper also implement ProjectableDecodingFormat and delegate createRuntimeDecoder(context, producedDataType, projections) to the inner format when it's a ProjectableDecodingFormat. This will create other issues. Rather than making `DecodingFormatWrapper` implement `ProjectableDecodingFormat`, I'd rather eliminate the wrapper entirely if possible. It exists only to override `getChangelogMode()`, but `KafkaDynamicSource` already has a `upsertMode` boolean so we can override the changelog mode directly in `KafkaDynamicSource.getChangelogMode()` when `upsertMode` is true, and pass the unwrapped format through. That way `Decoder.create()` sees the real format's capabilities (e.g. JSON's `ProjectableDecodingFormat`) without any delegation tricks. We can do all of that in a separate PR once this one goes in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
