Thanks Guozhang and John for your feedback. > 1. We need to have a clear inheritance protocol of headers in our topology: > 1.a. In PAPI's context.forward() call, it should be straight-forward. > 1.b. In DSL stateless operators, it should be straight-forward. > 1.c. What about in stateful operators like aggregates and joins?
Agree. Probably point 3 handles this. `Headers` been part of `RecordContext` would be handled the same way as other attributes. > 3. In future work "Adding DSL Processors to use Headers to filter/map/branch", it may well be covered in KIP-159; worth taking a look at that KIP. Yes, I will point to it. > 2. In terms of internal implementations, should the state store cache include the headers then in order to be sent downstreams? Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this is already supported. I will detail this on the KIP. > 4. MINOR: "void process(K key, V value, Headers headers)", this should be removed? Fixed, thanks. > 5. MINOR: it seems to be the case that in this KIP, our scope is only for > exposing the headers for reading, and not allowing users to add / modify headers, right? If yes, we'd better state it clearly at the "Proposed Changes" section. As headers is exposed in the `ProcessContext`, and headers will be send downstream, it can be mutated (add/remove headers). > Also, despite the decreased scope in this KIP, I think it might be valuable to define what will happen to headers once this change is implemented. For example, I think a minimal groundwork-level change might be to make the API changes, while promising to drop all headers from input records. I will suggest to pass headers to downstream nodes, and don't drop yhrm. Clients will have to drop `Headers` if they have used them. Or it could be something like a boolean config property that manage this. I would like to hear feedback here. > A maximal groundwork change would be to forward the headers through all > operators in Streams. But I think there are some unresolved questions about forwarding, like "what happens to the headers in a join?" Probably this would be solve once KIP-159 is implemented and supporting Headers. > There's of course some middle ground, but instinctively, I think I'd prefer to have a clear definition that headers are currently *not* forwarded, rather than having a complex list of operators that do or don't forward them. Plus, I think it might be tricky to define this behavior while not allowing the scope to return to that of your original proposal! Agree. But `Headers` were forwarded *explicitly* in the original proposal. The current one pass it as part of `RecordContext`, so if it's forward it or not is as the same as `RecordContext`. On top of this implementation, we can design how filter/map/join will be handled. Probably following KIP-159 approach. Cheers, Jorge. El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>) escribió: > Hi Jorge, > > Thanks for the written KIP! Made a pass over it and left some comments > (some of them overlapped with John's): > > 1. We need to have a clear inheritance protocol of headers in our topology: > > 1.a. In PAPI's context.forward() call, it should be straight-forward. > 1.b. In DSL stateless operators, it should be straight-forward. > 1.c. What about in stateful operators like aggregates and joins? > > 2. In terms of internal implementations, should the state store cache > include the headers then in order to be sent downstreams? > > 3. In future work "Adding DSL Processors to use Headers to > filter/map/branch", it may well be covered in KIP-159; worth taking a look > at that KIP. > > 4. MINOR: "void process(K key, V value, Headers headers)", this should be > removed? > > 5. MINOR: it seems to be the case that in this KIP, our scope is only for > exposing the headers for reading, and not allowing users to add / modify > headers, right? If yes, we'd better state it clearly at the "Proposed > Changes" section. > > > Guozhang > > > On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> wrote: > > > Hi Jorge, > > > > Thanks for the design work. > > > > I agree that de-scoping the work to just the Processor API will help > > contain the design and implementation complexity. > > > > In the KIP, it mentions that the headers would be available in the > > ProcessorContext, (like "context.headers()"). It also says that > > implementers would need to implement the method "void process(K key, V > > value, Headers headers);". I think maybe you meant to remove the proposal > > to modify "process", since it wouldn't be necessary in conjunction with > the > > ProcessorContext change, and it's not represented in your PR. > > > > Also, despite the decreased scope in this KIP, I think it might be > valuable > > to define what will happen to headers once this change is implemented. > For > > example, I think a minimal groundwork-level change might be to make the > API > > changes, while promising to drop all headers from input records. > > > > A maximal groundwork change would be to forward the headers through all > > operators in Streams. But I think there are some unresolved questions > about > > forwarding, like "what happens to the headers in a join?" > > > > There's of course some middle ground, but instinctively, I think I'd > prefer > > to have a clear definition that headers are currently *not* forwarded, > > rather than having a complex list of operators that do or don't forward > > them. Plus, I think it might be tricky to define this behavior while not > > allowing the scope to return to that of your original proposal! > > > > Thanks again for the KIP, > > -John > > > > On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > > > Hi Matthias, > > > > > > I've created a new JIRA to track this, updated the KIP and create a PR. > > > > > > Looking forward to your feedback, > > > > > > Jorge. > > > > > > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > > matth...@confluent.io > > > >) > > > escribió: > > > > > > > Hi Jorge, > > > > > > > > I would like to unblock this KIP to make some progress. The tricky > > > > question of this work, seems to be how to expose headers at DSL > level. > > > > This related to KIP-149 and KIP-159. However, for Processor API, it > > > > seems to be rather straight forward to add headers to the API. > > > > > > > > Thus, I would suggest to de-scope this KIP and add header support for > > > > Processor API only as a first step. If this is done, we can see in a > > > > second step, how to add headers at DSL level. > > > > > > > > WDYT about this proposal? > > > > > > > > If you agree, please update the JIRA and KIP accordingly. Note, that > we > > > > have two JIRA that are duplicates atm. We can scope them accordingly: > > > > one for PAPI only, and second as a dependent JIRA for DSL. > > > > > > > > > > > > -Matthias > > > > > > > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > > > > > Thanks for your feedback! > > > > > > > > > > 1. I was adding headers to KeyValue to support groupBy, but I think > > it > > > is > > > > > not necessary. It should be enough with mapping headers to > key/value > > > and > > > > > then group using current KeyValue structure. > > > > > > > > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as > structure, > > > > hence > > > > > considering headers as part of stateful operations will not fit in > > this > > > > > approach and increase complexity (I cannot think in a use-case that > > > need > > > > > this). > > > > > > > > > > 3. and 4. Changes on 1. will solve this issue. > > > > > > > > > > Probably I rush a bit proposing this change, I was not aware of > > KIP-159 > > > > or > > > > > KAFKA-5632. > > > > > If KIP-159 is adopted and we reduce this KIP to add Headers to > > > > > RecordContext will be enough, but I'm not sure about the scope of > > > > KIP-159. > > > > > If it includes stateful operations will be difficult to implemented > > as > > > > > stated in 2. > > > > > > > > > > Cheers, > > > > > Jorge. > > > > > > > > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > > > > matth...@confluent.io>) > > > > > escribió: > > > > > > > > > >> Thanks for the KIP Jorge, > > > > >> > > > > >> As Bill pointed out already, we should be careful with adding new > > > > >> overloads as this contradicts the work done via KIP-182. > > > > >> > > > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are you > > > aware > > > > >> of them? Both have quite long DISCUSS threads, but it might be > worth > > > > >> browsing through them. > > > > >> > > > > >> A few further questions: > > > > >> > > > > >> - why do you want to add the headers to `KeyValue`? I am not sure > > if > > > we > > > > >> should consider headers as optional metadata and add it to > > > > >> `RecordContext` similar to timestamp, offset, etc. only > > > > > > > > > > > > > > >> - You only include stateless single-record transformations at the > > DSL > > > > >> level. Do you suggest that all other operator just drop headers on > > the > > > > >> floor? > > > > >> > > > > >> - Why do you only want to put headers into in-memory and cache > but > > > not > > > > >> RocksDB store? What do you mean by "pass through"? IMHO, all > stores > > > > >> should behave the same at DSL level. > > > > >> -> if we store the headers in the state stores, what is the > > upgrade > > > > >> path? > > > > >> > > > > >> - Why do we need to store record header in state in the first > > place, > > > if > > > > >> we exclude stateful operator at DSL level? > > > > >> > > > > >> > > > > >> What is the motivation for the "border lines" you choose? > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> > > > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote: > > > > >>> Jorge, > > > > >>> > > > > >>> Thanks for the KIP, I know this is a feature others in the > > community > > > > have > > > > >>> been interested in getting into Kafka Streams. > > > > >>> > > > > >>> I took a quick pass over it, and I have one initial question. > > > > >>> > > > > >>> We recently reduced overloads with KIP-182, and in this KIP we > are > > > > >>> increasing them again. > > > > >>> > > > > >>> I can see from the KIP why they are necessary, but I'm wondering > if > > > > there > > > > >>> is something else we can do to cut down on the overloads > > > introduced. I > > > > >>> don't have any sound suggestions ATM, so I'll have to think about > > it > > > > some > > > > >>> more, but I wanted to put the thought out there. > > > > >>> > > > > >>> Thanks, > > > > >>> Bill > > > > >>> > > > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate Otoya < > > > > >>> quilcate.jo...@gmail.com> wrote: > > > > >>> > > > > >>>> Hi all, > > > > >>>> > > > > >>>> I have created a KIP to add Record Headers support to Kafka > > Streams > > > > API: > > > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > > > > >>>> > > > > >>>> > > > > >>>> The main goal is to be able to use headers to filter, map and > > > process > > > > >>>> records as streams. Stateful processing (joins, windows) are not > > > > >>>> considered. > > > > >>>> > > > > >>>> Proposed changes/Draft: > > > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo: > > > streams-headers > > > > >>>> > > > > >>>> Feedback and suggestions are more than welcome. > > > > >>>> > > > > >>>> Cheers, > > > > >>>> > > > > >>>> Jorge. > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >