Jennifer Thompson created KAFKA-10428:
-----------------------------------------

             Summary: Mirror Maker connect applies base64 encoding to string 
headers
                 Key: KAFKA-10428
                 URL: https://issues.apache.org/jira/browse/KAFKA-10428
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.6.0, 2.5.0, 2.4.0
            Reporter: Jennifer Thompson


MirrorSourceTask takes the header value as bytes from the ConsumerRecord, which 
does not have a header schema, and adds it to the SourceRecord headers using 
"addBytes". This uses Schema.BYTES as the schema for the header, and somehow, 
base64 encoding gets applied when the record gets committed.

This means that my original header value "with_headers" (created with a python 
producer, and stored as a 12 character byte array) becomes the string value 
"d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 encoded 
version of the original. If I try to preempt this using "d2l0aF9oZWFkZXJz" to 
start with, and base64 encoding the headers everywhere, it just gets double 
encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing through the 
MirrorSourceTask.

I think the base64 encoding may be coming from Values#append 
(https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674),
 but I'm not sure how. That is invoked by 
SimpleConnectorHeader#fromConnectHeader via Values#convertToString.

SimpleHeaderConverter#toConnectHeader produces the correct schema in this case, 
and solves the problem for me, but it seems to guess at the schema, so I'm not 
sure if it is the right solution. Since schemas seem to be required for 
SourceRecord headers, but not available from ConsumerRecord headers, I'm not 
sure what other option we have. I will open a PR with this solution



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to