Allen Wang created FLINK-11303:
----------------------------------

             Summary: Utilizing Kafka headers for serialization and 
deserialization
                 Key: FLINK-11303
                 URL: https://issues.apache.org/jira/browse/FLINK-11303
             Project: Flink
          Issue Type: Improvement
          Components: Kafka Connector
            Reporter: Allen Wang


Kafka introduces headers in producer and consumer record since version 0.11. 
This is the high level description: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]

However, current Flink Kafka connector simply ignores the headers. This will 
make it hard to integrate with the Kafka ecosystem where other Kafka clients 
make use of the headers.

 

I propose to support headers in Flink by modifying the following API:
 * In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}

 * In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> 
headers, String topic, int partition, long offset) throws IOException{code}

 

These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the 
serialization and deserialization process. If backward compatibility is a 
concern, we can add default implementation to these methods where headers are 
ignored.

 

If backward compatiblity 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to