[ https://issues.apache.org/jira/browse/KAFKA-10428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190370#comment-17190370 ]
Jennifer Thompson commented on KAFKA-10428: ------------------------------------------- Setting {{"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"}} in the connector config fixes the issue. > Mirror Maker connect applies base64 encoding to string headers > -------------------------------------------------------------- > > Key: KAFKA-10428 > URL: https://issues.apache.org/jira/browse/KAFKA-10428 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.4.0, 2.5.0, 2.6.0 > Reporter: Jennifer Thompson > Priority: Major > > MirrorSourceTask takes the header value as bytes from the ConsumerRecord, > which does not have a header schema, and adds it to the SourceRecord headers > using "addBytes". This uses Schema.BYTES as the schema for the header, and > somehow, base64 encoding gets applied when the record gets committed. > This means that my original header value "with_headers" (created with a > python producer, and stored as a 12 character byte array) becomes the string > value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 > encoded version of the original. If I try to preempt this using > "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, > it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing > through the MirrorSourceTask. > I think the base64 encoding may be coming from Values#append > (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674), > but I'm not sure how. That is invoked by > SimpleConnectorHeader#fromConnectHeader via Values#convertToString. > SimpleHeaderConverter#toConnectHeader produces the correct schema in this > case, and solves the problem for me, but it seems to guess at the schema, so > I'm not sure if it is the right solution. Since schemas seem to be required > for SourceRecord headers, but not available from ConsumerRecord headers, I'm > not sure what other option we have. I will open a PR with this solution -- This message was sent by Atlassian Jira (v8.3.4#803005)