Allen Wang created FLINK-11303: ---------------------------------- Summary: Utilizing Kafka headers for serialization and deserialization Key: FLINK-11303 URL: https://issues.apache.org/jira/browse/FLINK-11303 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Allen Wang
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers] However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers. I propose to support headers in Flink by modifying the following API: * In KeyedSerializationSchema, add {code:java} List<Tuple2<String, byte[]>> getHeaders(T element) {code} * In KeyedDeserializationSchema, add {code:java} T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code} These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored. If backward compatiblity -- This message was sent by Atlassian JIRA (v7.6.3#76005)