azagrebin commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r253055987
########## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ########## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** + * + * @param element The incoming element to be serialized + * @return collection of headers (maybe empty) + */ + default Iterable<Map.Entry<String, byte[]>> headers(T element) { Review comment: @alexeyt820 Ideally we could deprecate the partitioner in the producer constructer as well because ProducerRecord already contains partition which user can assign for the record. Instead of deprecating the methods in (de)ser schema interfaces we could deprecate them fully and introduce new `Kafka(De)SerializationSchema` interfaces which work with Kafka `Consumer/ProducerRecord` classes. We also introduce adaptors from older schemas to the newer ones. The producer/consumer constructors, which currently accept older schemas, will use adaptors to create newer schemas. The actual code should also work with newer schemas only. The serialization schema adaptor could optionally take the `FlinkKafkaPartitioner` to populate the partition in similar way as now it happens directly in producer. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services