Arvid Heise created FLINK-39792:
-----------------------------------
Summary: Add multi-headers metadata key to Kafka connector
Key: FLINK-39792
URL: https://issues.apache.org/jira/browse/FLINK-39792
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: Arvid Heise
Problem:
The Kafka wire format models headers as an ordered, duplicate-key-allowed
iterable ({{{}Headers extends Iterable<Header>{}}}), but the existing headers
metadata key uses {{{}MAP<STRING, BYTES>{}}}, which cannot faithfully represent
this: when multiple headers share a key, iteration into a {{HashMap}} silently
discards all but the last value, losing both duplicates and insertion order.
This makes the current metadata key unsuitable for use cases that rely on
repeated header keys or order-sensitive header processing.
Solution:
Add a new metadata key {{multi-headers}} with type {{ARRAY<ROW<name STRING,
value BYTES>>}} to {{KafkaDynamicSource}} and {{{}KafkaDynamicSink{}}}. On the
source side, {{record.headers()}} is iterated in wire order into the array,
preserving duplicates and position.
On the sink side, the array is iterated and each entry is appended via
{{{}record.headers().add(name, value){}}}. The existing ??headers?? key
({{{}MAP<STRING, BYTES>{}}}) is kept unchanged and soft-deprecated — no
breaking change; users opt in to the new key at their own pace.
If both, new and old header keys are used, we throw an error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)