Jennifer Thompson created KAFKA-10428: -----------------------------------------
Summary: Mirror Maker connect applies base64 encoding to string headers Key: KAFKA-10428 URL: https://issues.apache.org/jira/browse/KAFKA-10428 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.6.0, 2.5.0, 2.4.0 Reporter: Jennifer Thompson MirrorSourceTask takes the header value as bytes from the ConsumerRecord, which does not have a header schema, and adds it to the SourceRecord headers using "addBytes". This uses Schema.BYTES as the schema for the header, and somehow, base64 encoding gets applied when the record gets committed. This means that my original header value "with_headers" (created with a python producer, and stored as a 12 character byte array) becomes the string value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 encoded version of the original. If I try to preempt this using "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing through the MirrorSourceTask. I think the base64 encoding may be coming from Values#append (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674), but I'm not sure how. That is invoked by SimpleConnectorHeader#fromConnectHeader via Values#convertToString. SimpleHeaderConverter#toConnectHeader produces the correct schema in this case, and solves the problem for me, but it seems to guess at the schema, so I'm not sure if it is the right solution. Since schemas seem to be required for SourceRecord headers, but not available from ConsumerRecord headers, I'm not sure what other option we have. I will open a PR with this solution -- This message was sent by Atlassian Jira (v8.3.4#803005)