Hi Matthias,
Sorry for the late reply.
I like the proposal. Just to check if I got it right:
We can extend the `kstream.to()` function to support setting headers. e.g.:
```
void to(final String topic,
final Produced<K, V> produced,
final HeadersExtractor<K, V> headersExtractor);
```
where `HeadersExtractor`:
```
public interface HeadersExtractor<K, V> {
Headers extract(final K key, final V value, final RecordContext
recordContext);
}
```
This would require to change `Topology#addSink()` to support this
extractor as well.
If this is aligned with your proposal, I'm happy to add it to this KIP.
Cheers,
Jorge.
On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <[email protected]> wrote:
> Jorge,
>
> thanks a lot for this KIP. Being able to modify headers is a very
> valuable feature.
>
> However, before we actually expose them in the DSL, I am wondering if we
> should improve how headers can be modified in the PAPI? Currently, it is
> possible but very clumsy to work with headers in the Processor API,
> because of two reasons:
>
> (1) There is no default `Headers` implementation in the public API
> (2) There is no explicit way to set headers for output records
>
> Currently, the input record headers are copied into the output records
> when `forward()` is called, however, it's not really a deep copy but we
> just copy the reference. This implies that one needs to work with a
> single mutable object that flows through multiple processors making it
> very error prone.
>
> Furthermore, if you want to emit multiple output records, and for
> example want to add two different headers to the output record (based on
> the same input headers), you would need to do something like this:
>
> Headers h = context.headers();
> h.add(...);
> context.forward(...);
> // remove the header you added for the first output record
> h.remove(...);
> h.add(...);
> context.forward(...);
>
>
> Maybe we could extend `To` to allow passing in a new `Headers` object
> (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> add it to your KIP or do a new KIP just for the PAPI.
>
> Thoughts?
>
>
> -Matthias
>
> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > Hi everyone,
> >
> > Bumping this thread to check if there's any feedback.
> >
> > Cheers,
> > Jorge.
> >
> > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > [email protected]> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to start the discussion for KIP-634:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> >>
> >> Looking forward to your feedback.
> >>
> >> Thanks!
> >> Jorge.
> >>
> >>
> >>
> >>
> >
>
>