Thanks Guozhang and John for your feedback.

> 1. We need to have a clear inheritance protocol of headers in our
topology:
> 1.a. In PAPI's context.forward() call, it should be straight-forward.
> 1.b. In DSL stateless operators, it should be straight-forward.
> 1.c. What about in stateful operators like aggregates and joins?

Agree. Probably point 3 handles this. `Headers` been part of
`RecordContext` would be handled the same way as other attributes.

> 3. In future work "Adding DSL Processors to use Headers to filter/map/branch",
it may well be covered in KIP-159; worth taking a look at that KIP.

Yes, I will point to it.

> 2. In terms of internal implementations, should the state store
cache include the headers then in order to be sent downstreams?

Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this is
already supported. I will detail this on the KIP.

> 4. MINOR: "void process(K key, V value, Headers headers)", this should be
removed?

Fixed, thanks.

> 5. MINOR: it seems to be the case that in this KIP, our scope is only for 
> exposing
the headers for reading, and not allowing users to add / modify headers,
right? If yes, we'd better state it clearly at the "Proposed Changes"
section.

As headers is exposed in the `ProcessContext`, and headers will be send
downstream, it can be mutated (add/remove headers).

 > Also, despite the decreased scope in this KIP, I think it might be
valuable to define what will happen to headers once this change is
implemented. For example, I think a minimal groundwork-level change might
be to make the API changes, while promising to drop all headers from input
records.

I will suggest to pass headers to downstream nodes, and don't drop yhrm.
Clients will have to drop `Headers` if they have used them.
Or it could be something like a boolean config property that manage this.
I would like to hear feedback here.

> A maximal groundwork change would be to forward the headers through all 
> operators
in

Streams. But I think there are some unresolved questions about forwarding,
like "what happens to the headers in a join?"
Probably this would be solve once KIP-159 is implemented and supporting
Headers.

> There's of course some middle ground, but instinctively, I think I'd
prefer to have a clear definition that headers are currently *not*
forwarded, rather than having a complex list of operators that do or don't
forward them. Plus, I think it might be tricky to define this behavior
while not allowing the scope to return to that of your original proposal!

Agree. But `Headers` were forwarded *explicitly* in the original proposal.
The current one pass it as part of `RecordContext`, so if it's forward it
or not is as the same as `RecordContext`.
On top of this implementation, we can design how filter/map/join will be
handled. Probably following KIP-159 approach.

Cheers,
Jorge.

El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>)
escribió:

> Hi Jorge,
>
> Thanks for the written KIP! Made a pass over it and left some comments
> (some of them overlapped with John's):
>
> 1. We need to have a clear inheritance protocol of headers in our topology:
>
> 1.a. In PAPI's context.forward() call, it should be straight-forward.
> 1.b. In DSL stateless operators, it should be straight-forward.
> 1.c. What about in stateful operators like aggregates and joins?
>
> 2. In terms of internal implementations, should the state store cache
> include the headers then in order to be sent downstreams?
>
> 3. In future work "Adding DSL Processors to use Headers to
> filter/map/branch", it may well be covered in KIP-159; worth taking a look
> at that KIP.
>
> 4. MINOR: "void process(K key, V value, Headers headers)", this should be
> removed?
>
> 5. MINOR: it seems to be the case that in this KIP, our scope is only for
> exposing the headers for reading, and not allowing users to add / modify
> headers, right? If yes, we'd better state it clearly at the "Proposed
> Changes" section.
>
>
> Guozhang
>
>
> On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> wrote:
>
> > Hi Jorge,
> >
> > Thanks for the design work.
> >
> > I agree that de-scoping the work to just the Processor API will help
> > contain the design and implementation complexity.
> >
> > In the KIP, it mentions that the headers would be available in the
> > ProcessorContext, (like "context.headers()"). It also says that
> > implementers would need to implement the method "void process(K key, V
> > value, Headers headers);". I think maybe you meant to remove the proposal
> > to modify "process", since it wouldn't be necessary in conjunction with
> the
> > ProcessorContext change, and it's not represented in your PR.
> >
> > Also, despite the decreased scope in this KIP, I think it might be
> valuable
> > to define what will happen to headers once this change is implemented.
> For
> > example, I think a minimal groundwork-level change might be to make the
> API
> > changes, while promising to drop all headers from input records.
> >
> > A maximal groundwork change would be to forward the headers through all
> > operators in Streams. But I think there are some unresolved questions
> about
> > forwarding, like "what happens to the headers in a join?"
> >
> > There's of course some middle ground, but instinctively, I think I'd
> prefer
> > to have a clear definition that headers are currently *not* forwarded,
> > rather than having a complex list of operators that do or don't forward
> > them. Plus, I think it might be tricky to define this behavior while not
> > allowing the scope to return to that of your original proposal!
> >
> > Thanks again for the KIP,
> > -John
> >
> > On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Hi Matthias,
> > >
> > > I've created a new JIRA to track this, updated the KIP and create a PR.
> > >
> > > Looking forward to your feedback,
> > >
> > > Jorge.
> > >
> > > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > matth...@confluent.io
> > > >)
> > > escribió:
> > >
> > > > Hi Jorge,
> > > >
> > > > I would like to unblock this KIP to make some progress. The tricky
> > > > question of this work, seems to be how to expose headers at DSL
> level.
> > > > This related to KIP-149 and KIP-159. However, for Processor API, it
> > > > seems to be rather straight forward to add headers to the API.
> > > >
> > > > Thus, I would suggest to de-scope this KIP and add header support for
> > > > Processor API only as a first step. If this is done, we can see in a
> > > > second step, how to add headers at DSL level.
> > > >
> > > > WDYT about this proposal?
> > > >
> > > > If you agree, please update the JIRA and KIP accordingly. Note, that
> we
> > > > have two JIRA that are duplicates atm. We can scope them accordingly:
> > > > one for PAPI only, and second as a dependent JIRA for DSL.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Thanks for your feedback!
> > > > >
> > > > > 1. I was adding headers to KeyValue to support groupBy, but I think
> > it
> > > is
> > > > > not necessary. It should be enough with mapping headers to
> key/value
> > > and
> > > > > then group using current KeyValue structure.
> > > > >
> > > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
> structure,
> > > > hence
> > > > > considering headers as part of stateful operations will not fit in
> > this
> > > > > approach and increase complexity (I cannot think in a use-case that
> > > need
> > > > > this).
> > > > >
> > > > > 3. and 4. Changes on 1. will solve this issue.
> > > > >
> > > > > Probably I rush a bit proposing this change, I was not aware of
> > KIP-159
> > > > or
> > > > > KAFKA-5632.
> > > > > If KIP-159 is adopted and we reduce this KIP to add Headers to
> > > > > RecordContext will be enough, but I'm not sure about the scope of
> > > > KIP-159.
> > > > > If it includes stateful operations will be difficult to implemented
> > as
> > > > > stated in 2.
> > > > >
> > > > > Cheers,
> > > > > Jorge.
> > > > >
> > > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > > > matth...@confluent.io>)
> > > > > escribió:
> > > > >
> > > > >> Thanks for the KIP Jorge,
> > > > >>
> > > > >> As Bill pointed out already, we should be careful with adding new
> > > > >> overloads as this contradicts the work done via KIP-182.
> > > > >>
> > > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are you
> > > aware
> > > > >> of them? Both have quite long DISCUSS threads, but it might be
> worth
> > > > >> browsing through them.
> > > > >>
> > > > >> A few further questions:
> > > > >>
> > > > >>  - why do you want to add the headers to `KeyValue`? I am not sure
> > if
> > > we
> > > > >> should consider headers as optional metadata and add it to
> > > > >> `RecordContext` similar to timestamp, offset, etc. only
> > > > >
> > > > >
> > > > >>  - You only include stateless single-record transformations at the
> > DSL
> > > > >> level. Do you suggest that all other operator just drop headers on
> > the
> > > > >> floor?
> > > > >>
> > > > >>  - Why do you only want to put headers into in-memory and cache
> but
> > > not
> > > > >> RocksDB store? What do you mean by "pass through"? IMHO, all
> stores
> > > > >> should behave the same at DSL level.
> > > > >>    -> if we store the headers in the state stores, what is the
> > upgrade
> > > > >> path?
> > > > >>
> > > > >>  - Why do we need to store record header in state in the first
> > place,
> > > if
> > > > >> we exclude stateful operator at DSL level?
> > > > >>
> > > > >>
> > > > >> What is the motivation for the "border lines" you choose?
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > > >>> Jorge,
> > > > >>>
> > > > >>> Thanks for the KIP, I know this is a feature others in the
> > community
> > > > have
> > > > >>> been interested in getting into Kafka Streams.
> > > > >>>
> > > > >>> I took a quick pass over it, and I have one initial question.
> > > > >>>
> > > > >>> We recently reduced overloads with KIP-182, and in this KIP we
> are
> > > > >>> increasing them again.
> > > > >>>
> > > > >>> I can see from the KIP why they are necessary, but I'm wondering
> if
> > > > there
> > > > >>> is something else we can do to cut down on the overloads
> > > introduced.  I
> > > > >>> don't have any sound suggestions ATM, so I'll have to think about
> > it
> > > > some
> > > > >>> more, but I wanted to put the thought out there.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Bill
> > > > >>>
> > > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate Otoya <
> > > > >>> quilcate.jo...@gmail.com> wrote:
> > > > >>>
> > > > >>>> Hi all,
> > > > >>>>
> > > > >>>> I have created a KIP to add Record Headers support to Kafka
> > Streams
> > > > API:
> > > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > > >>>>
> > > > >>>>
> > > > >>>> The main goal is to be able to use headers to filter, map and
> > > process
> > > > >>>> records as streams. Stateful processing (joins, windows) are not
> > > > >>>> considered.
> > > > >>>>
> > > > >>>> Proposed changes/Draft:
> > > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> > > streams-headers
> > > > >>>>
> > > > >>>> Feedback and suggestions are more than welcome.
> > > > >>>>
> > > > >>>> Cheers,
> > > > >>>>
> > > > >>>> Jorge.
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to