Arvid Heise created FLINK-39792:
-----------------------------------

             Summary: Add multi-headers metadata key to Kafka connector
                 Key: FLINK-39792
                 URL: https://issues.apache.org/jira/browse/FLINK-39792
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: Arvid Heise


Problem:
The Kafka wire format models headers as an ordered, duplicate-key-allowed 
iterable ({{{}Headers extends Iterable<Header>{}}}), but the existing headers 
metadata key uses {{{}MAP<STRING, BYTES>{}}}, which cannot faithfully represent 
this: when multiple headers share a key, iteration into a {{HashMap}} silently 
discards all but the last value, losing both duplicates and insertion order. 
This makes the current metadata key unsuitable for use cases that rely on 
repeated header keys or order-sensitive header processing.

Solution:

Add a new metadata key {{multi-headers}} with type {{ARRAY<ROW<name STRING, 
value BYTES>>}} to {{KafkaDynamicSource}} and {{{}KafkaDynamicSink{}}}. On the 
source side, {{record.headers()}} is iterated in wire order into the array, 
preserving duplicates and position.

On the sink side, the array is iterated and each entry is appended via 
{{{}record.headers().add(name, value){}}}. The existing ??headers?? key 
({{{}MAP<STRING, BYTES>{}}}) is kept unchanged and soft-deprecated — no 
breaking change; users opt in to the new key  at their own pace.

If both, new and old header keys are used, we throw an error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to