Hi Jorge,

Thanks for the written KIP! Made a pass over it and left some comments
(some of them overlapped with John's):

1. We need to have a clear inheritance protocol of headers in our topology:

1.a. In PAPI's context.forward() call, it should be straight-forward.
1.b. In DSL stateless operators, it should be straight-forward.
1.c. What about in stateful operators like aggregates and joins?

2. In terms of internal implementations, should the state store cache
include the headers then in order to be sent downstreams?

3. In future work "Adding DSL Processors to use Headers to
filter/map/branch", it may well be covered in KIP-159; worth taking a look
at that KIP.

4. MINOR: "void process(K key, V value, Headers headers)", this should be
removed?

5. MINOR: it seems to be the case that in this KIP, our scope is only for
exposing the headers for reading, and not allowing users to add / modify
headers, right? If yes, we'd better state it clearly at the "Proposed
Changes" section.


Guozhang


On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> wrote:

> Hi Jorge,
>
> Thanks for the design work.
>
> I agree that de-scoping the work to just the Processor API will help
> contain the design and implementation complexity.
>
> In the KIP, it mentions that the headers would be available in the
> ProcessorContext, (like "context.headers()"). It also says that
> implementers would need to implement the method "void process(K key, V
> value, Headers headers);". I think maybe you meant to remove the proposal
> to modify "process", since it wouldn't be necessary in conjunction with the
> ProcessorContext change, and it's not represented in your PR.
>
> Also, despite the decreased scope in this KIP, I think it might be valuable
> to define what will happen to headers once this change is implemented. For
> example, I think a minimal groundwork-level change might be to make the API
> changes, while promising to drop all headers from input records.
>
> A maximal groundwork change would be to forward the headers through all
> operators in Streams. But I think there are some unresolved questions about
> forwarding, like "what happens to the headers in a join?"
>
> There's of course some middle ground, but instinctively, I think I'd prefer
> to have a clear definition that headers are currently *not* forwarded,
> rather than having a complex list of operators that do or don't forward
> them. Plus, I think it might be tricky to define this behavior while not
> allowing the scope to return to that of your original proposal!
>
> Thanks again for the KIP,
> -John
>
> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > I've created a new JIRA to track this, updated the KIP and create a PR.
> >
> > Looking forward to your feedback,
> >
> > Jorge.
> >
> > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> matth...@confluent.io
> > >)
> > escribió:
> >
> > > Hi Jorge,
> > >
> > > I would like to unblock this KIP to make some progress. The tricky
> > > question of this work, seems to be how to expose headers at DSL level.
> > > This related to KIP-149 and KIP-159. However, for Processor API, it
> > > seems to be rather straight forward to add headers to the API.
> > >
> > > Thus, I would suggest to de-scope this KIP and add header support for
> > > Processor API only as a first step. If this is done, we can see in a
> > > second step, how to add headers at DSL level.
> > >
> > > WDYT about this proposal?
> > >
> > > If you agree, please update the JIRA and KIP accordingly. Note, that we
> > > have two JIRA that are duplicates atm. We can scope them accordingly:
> > > one for PAPI only, and second as a dependent JIRA for DSL.
> > >
> > >
> > > -Matthias
> > >
> > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > Thanks for your feedback!
> > > >
> > > > 1. I was adding headers to KeyValue to support groupBy, but I think
> it
> > is
> > > > not necessary. It should be enough with mapping headers to key/value
> > and
> > > > then group using current KeyValue structure.
> > > >
> > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as structure,
> > > hence
> > > > considering headers as part of stateful operations will not fit in
> this
> > > > approach and increase complexity (I cannot think in a use-case that
> > need
> > > > this).
> > > >
> > > > 3. and 4. Changes on 1. will solve this issue.
> > > >
> > > > Probably I rush a bit proposing this change, I was not aware of
> KIP-159
> > > or
> > > > KAFKA-5632.
> > > > If KIP-159 is adopted and we reduce this KIP to add Headers to
> > > > RecordContext will be enough, but I'm not sure about the scope of
> > > KIP-159.
> > > > If it includes stateful operations will be difficult to implemented
> as
> > > > stated in 2.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > > matth...@confluent.io>)
> > > > escribió:
> > > >
> > > >> Thanks for the KIP Jorge,
> > > >>
> > > >> As Bill pointed out already, we should be careful with adding new
> > > >> overloads as this contradicts the work done via KIP-182.
> > > >>
> > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are you
> > aware
> > > >> of them? Both have quite long DISCUSS threads, but it might be worth
> > > >> browsing through them.
> > > >>
> > > >> A few further questions:
> > > >>
> > > >>  - why do you want to add the headers to `KeyValue`? I am not sure
> if
> > we
> > > >> should consider headers as optional metadata and add it to
> > > >> `RecordContext` similar to timestamp, offset, etc. only
> > > >
> > > >
> > > >>  - You only include stateless single-record transformations at the
> DSL
> > > >> level. Do you suggest that all other operator just drop headers on
> the
> > > >> floor?
> > > >>
> > > >>  - Why do you only want to put headers into in-memory and cache but
> > not
> > > >> RocksDB store? What do you mean by "pass through"? IMHO, all stores
> > > >> should behave the same at DSL level.
> > > >>    -> if we store the headers in the state stores, what is the
> upgrade
> > > >> path?
> > > >>
> > > >>  - Why do we need to store record header in state in the first
> place,
> > if
> > > >> we exclude stateful operator at DSL level?
> > > >>
> > > >>
> > > >> What is the motivation for the "border lines" you choose?
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > >>> Jorge,
> > > >>>
> > > >>> Thanks for the KIP, I know this is a feature others in the
> community
> > > have
> > > >>> been interested in getting into Kafka Streams.
> > > >>>
> > > >>> I took a quick pass over it, and I have one initial question.
> > > >>>
> > > >>> We recently reduced overloads with KIP-182, and in this KIP we are
> > > >>> increasing them again.
> > > >>>
> > > >>> I can see from the KIP why they are necessary, but I'm wondering if
> > > there
> > > >>> is something else we can do to cut down on the overloads
> > introduced.  I
> > > >>> don't have any sound suggestions ATM, so I'll have to think about
> it
> > > some
> > > >>> more, but I wanted to put the thought out there.
> > > >>>
> > > >>> Thanks,
> > > >>> Bill
> > > >>>
> > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate Otoya <
> > > >>> quilcate.jo...@gmail.com> wrote:
> > > >>>
> > > >>>> Hi all,
> > > >>>>
> > > >>>> I have created a KIP to add Record Headers support to Kafka
> Streams
> > > API:
> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > >>>>
> > > >>>>
> > > >>>> The main goal is to be able to use headers to filter, map and
> > process
> > > >>>> records as streams. Stateful processing (joins, windows) are not
> > > >>>> considered.
> > > >>>>
> > > >>>> Proposed changes/Draft:
> > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> > streams-headers
> > > >>>>
> > > >>>> Feedback and suggestions are more than welcome.
> > > >>>>
> > > >>>> Cheers,
> > > >>>>
> > > >>>> Jorge.
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to