Matthias, thanks for sharing your opinions in the inheritance protocol of the record context. I'm thinking maybe we should make this discussion as a separate KIP by itself? If yes, then KIP-244's scope would be smaller, and within KIP-244 we can have a simple inheritance rule that setting it to null when 1) going through stateful operators and 2) sending to any topics.
Guozhang On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Making the inheritance protocol a public contract seems reasonable to me. > > In the current implementation, all output records inherits the offset, > timestamp, topic, and partition metadata from the input record. We > already added an API to change the timestamp explicitly for the output > record thought. > > I think it make sense to keep the inheritance of offset, topic, and > partition. For headers, it's worth to discuss. I see arguments for two > strategies: (1) inherit by default, (2) set `null` by default. > Independent of the default behavior, we should add an API to set headers > for output records explicitly though (similar to the "set timestamp API"). > > From my point of view, timestamp/headers are a different > "class/category" of data/metadata than topic/partition/offset. For the > first category, it makes sense to manipulate them and it's more than > "plain metadata"; especially the timestamp. For the second category it > does not make sense to manipulate it, and to me topic/partition/offset > is pure metadata only---strictly speaking, it's even questionable if > output records should have any value for topic/partition/offset in the > first place, or if they should be `null`, because those attributes do > only make sense for source records that are consumed from a topic > directly only. On the other hand, if we make this difference explicit, > it might be useful information for the use to track the current > topic/partition/offset of the original source record. > > Furthermore, to me, timestamps and headers are somewhat different, too. > For stream processing it's required that every record has a timestamp; > thus, it make sense to inherit the input record timestamp by default (a > timestamp is not really metadata but actually equally important to key > and value from my point of view). Header however are optional, and thus > inheriting them is not really required. It might be convenient though: > for example, imagine a simple "filter-only" application -- it would be > cumbersome for users to explicitly copy the headers from the input > records to the output records -- it seems to be unnecessary boilerplate > code. On the other hand, for any other more complex use case, it's > questionable to inherit headers---note, that headers would be written to > the output topics increasing the size of the messages. Overall, I am not > sure which default strategy might be the better one for headers. Is > there a convincing argument for either one of them? I slightly tend to > think that using `null` as default might be better. > > Last, we could also make the default behavior configurable. Something > like `inherit.record.headers=true/false` with default value "false". > This would allow people to opt-in for auto-header-inheritance. Just an > idea I wanted to add to the discussion---not sure if it's a good one. > > > -Matthias > > On 5/4/18 3:13 PM, Guozhang Wang wrote: > > Hello Jorge, > > > >> Agree. Probably point 3 handles this. `Headers` been part of > `RecordContext` > > would be handled the same way as other attributes. > > > > Today we do not have a clear inheritance protocol for other fields of > > RecordContext yet: although internally we do have some criterion on > > topic/partition/offset and timestamp, they are not explicitly exposed to > > users. > > > > > > I think we still need to have a defined protocol for headers itself, but > I > > agree that it better to be scoped out side of this KIP, since this > > inheritance protocol itself for all the fields of RecordContext would > > better be a separate KIP. We can document this clearly in the wiki page. > > > > Guozhang > > > > > > On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < > > garcia.florian.pe...@gmail.com> wrote: > > > >> Hi, > >> > >> For me this is a great first step to have Headers in streaming. > >> My current use case is about distributed tracing (Zipkin) and with the > >> headers in the processorContext() I'll be able to manage that for the > most > >> cases. > >> The KIP-159 should follow after this but this is where all the major > >> questions will arise for stateful operations (as Guozhang said). > >> > >> Thanks for the work on this Jorge. > >> > >> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya < > >> quilcate.jo...@gmail.com> a écrit : > >> > >>> Thanks Guozhang and John for your feedback. > >>> > >>>> 1. We need to have a clear inheritance protocol of headers in our > >>> topology: > >>>> 1.a. In PAPI's context.forward() call, it should be straight-forward. > >>>> 1.b. In DSL stateless operators, it should be straight-forward. > >>>> 1.c. What about in stateful operators like aggregates and joins? > >>> > >>> Agree. Probably point 3 handles this. `Headers` been part of > >>> `RecordContext` would be handled the same way as other attributes. > >>> > >>>> 3. In future work "Adding DSL Processors to use Headers to > >>> filter/map/branch", > >>> it may well be covered in KIP-159; worth taking a look at that KIP. > >>> > >>> Yes, I will point to it. > >>> > >>>> 2. In terms of internal implementations, should the state store > >>> cache include the headers then in order to be sent downstreams? > >>> > >>> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks > this > >> is > >>> already supported. I will detail this on the KIP. > >>> > >>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should > >> be > >>> removed? > >>> > >>> Fixed, thanks. > >>> > >>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only > >>> for exposing > >>> the headers for reading, and not allowing users to add / modify > headers, > >>> right? If yes, we'd better state it clearly at the "Proposed Changes" > >>> section. > >>> > >>> As headers is exposed in the `ProcessContext`, and headers will be send > >>> downstream, it can be mutated (add/remove headers). > >>> > >>> > Also, despite the decreased scope in this KIP, I think it might be > >>> valuable to define what will happen to headers once this change is > >>> implemented. For example, I think a minimal groundwork-level change > might > >>> be to make the API changes, while promising to drop all headers from > >> input > >>> records. > >>> > >>> I will suggest to pass headers to downstream nodes, and don't drop > yhrm. > >>> Clients will have to drop `Headers` if they have used them. > >>> Or it could be something like a boolean config property that manage > this. > >>> I would like to hear feedback here. > >>> > >>>> A maximal groundwork change would be to forward the headers through > all > >>> operators > >>> in > >>> > >>> Streams. But I think there are some unresolved questions about > >> forwarding, > >>> like "what happens to the headers in a join?" > >>> Probably this would be solve once KIP-159 is implemented and supporting > >>> Headers. > >>> > >>>> There's of course some middle ground, but instinctively, I think I'd > >>> prefer to have a clear definition that headers are currently *not* > >>> forwarded, rather than having a complex list of operators that do or > >> don't > >>> forward them. Plus, I think it might be tricky to define this behavior > >>> while not allowing the scope to return to that of your original > proposal! > >>> > >>> Agree. But `Headers` were forwarded *explicitly* in the original > >> proposal. > >>> The current one pass it as part of `RecordContext`, so if it's forward > it > >>> or not is as the same as `RecordContext`. > >>> On top of this implementation, we can design how filter/map/join will > be > >>> handled. Probably following KIP-159 approach. > >>> > >>> Cheers, > >>> Jorge. > >>> > >>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>) > >>> escribió: > >>> > >>>> Hi Jorge, > >>>> > >>>> Thanks for the written KIP! Made a pass over it and left some comments > >>>> (some of them overlapped with John's): > >>>> > >>>> 1. We need to have a clear inheritance protocol of headers in our > >>> topology: > >>>> > >>>> 1.a. In PAPI's context.forward() call, it should be straight-forward. > >>>> 1.b. In DSL stateless operators, it should be straight-forward. > >>>> 1.c. What about in stateful operators like aggregates and joins? > >>>> > >>>> 2. In terms of internal implementations, should the state store cache > >>>> include the headers then in order to be sent downstreams? > >>>> > >>>> 3. In future work "Adding DSL Processors to use Headers to > >>>> filter/map/branch", it may well be covered in KIP-159; worth taking a > >>> look > >>>> at that KIP. > >>>> > >>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should > >> be > >>>> removed? > >>>> > >>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only > >> for > >>>> exposing the headers for reading, and not allowing users to add / > >> modify > >>>> headers, right? If yes, we'd better state it clearly at the "Proposed > >>>> Changes" section. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> > >> wrote: > >>>> > >>>>> Hi Jorge, > >>>>> > >>>>> Thanks for the design work. > >>>>> > >>>>> I agree that de-scoping the work to just the Processor API will help > >>>>> contain the design and implementation complexity. > >>>>> > >>>>> In the KIP, it mentions that the headers would be available in the > >>>>> ProcessorContext, (like "context.headers()"). It also says that > >>>>> implementers would need to implement the method "void process(K key, > >> V > >>>>> value, Headers headers);". I think maybe you meant to remove the > >>> proposal > >>>>> to modify "process", since it wouldn't be necessary in conjunction > >> with > >>>> the > >>>>> ProcessorContext change, and it's not represented in your PR. > >>>>> > >>>>> Also, despite the decreased scope in this KIP, I think it might be > >>>> valuable > >>>>> to define what will happen to headers once this change is > >> implemented. > >>>> For > >>>>> example, I think a minimal groundwork-level change might be to make > >> the > >>>> API > >>>>> changes, while promising to drop all headers from input records. > >>>>> > >>>>> A maximal groundwork change would be to forward the headers through > >> all > >>>>> operators in Streams. But I think there are some unresolved questions > >>>> about > >>>>> forwarding, like "what happens to the headers in a join?" > >>>>> > >>>>> There's of course some middle ground, but instinctively, I think I'd > >>>> prefer > >>>>> to have a clear definition that headers are currently *not* > >> forwarded, > >>>>> rather than having a complex list of operators that do or don't > >> forward > >>>>> them. Plus, I think it might be tricky to define this behavior while > >>> not > >>>>> allowing the scope to return to that of your original proposal! > >>>>> > >>>>> Thanks again for the KIP, > >>>>> -John > >>>>> > >>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya < > >>>>> quilcate.jo...@gmail.com> wrote: > >>>>> > >>>>>> Hi Matthias, > >>>>>> > >>>>>> I've created a new JIRA to track this, updated the KIP and create a > >>> PR. > >>>>>> > >>>>>> Looking forward to your feedback, > >>>>>> > >>>>>> Jorge. > >>>>>> > >>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > >>>>> matth...@confluent.io > >>>>>>> ) > >>>>>> escribió: > >>>>>> > >>>>>>> Hi Jorge, > >>>>>>> > >>>>>>> I would like to unblock this KIP to make some progress. The > >> tricky > >>>>>>> question of this work, seems to be how to expose headers at DSL > >>>> level. > >>>>>>> This related to KIP-149 and KIP-159. However, for Processor API, > >> it > >>>>>>> seems to be rather straight forward to add headers to the API. > >>>>>>> > >>>>>>> Thus, I would suggest to de-scope this KIP and add header support > >>> for > >>>>>>> Processor API only as a first step. If this is done, we can see > >> in > >>> a > >>>>>>> second step, how to add headers at DSL level. > >>>>>>> > >>>>>>> WDYT about this proposal? > >>>>>>> > >>>>>>> If you agree, please update the JIRA and KIP accordingly. Note, > >>> that > >>>> we > >>>>>>> have two JIRA that are duplicates atm. We can scope them > >>> accordingly: > >>>>>>> one for PAPI only, and second as a dependent JIRA for DSL. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > >>>>>>>> Thanks for your feedback! > >>>>>>>> > >>>>>>>> 1. I was adding headers to KeyValue to support groupBy, but I > >>> think > >>>>> it > >>>>>> is > >>>>>>>> not necessary. It should be enough with mapping headers to > >>>> key/value > >>>>>> and > >>>>>>>> then group using current KeyValue structure. > >>>>>>>> > >>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as > >>>> structure, > >>>>>>> hence > >>>>>>>> considering headers as part of stateful operations will not fit > >>> in > >>>>> this > >>>>>>>> approach and increase complexity (I cannot think in a use-case > >>> that > >>>>>> need > >>>>>>>> this). > >>>>>>>> > >>>>>>>> 3. and 4. Changes on 1. will solve this issue. > >>>>>>>> > >>>>>>>> Probably I rush a bit proposing this change, I was not aware of > >>>>> KIP-159 > >>>>>>> or > >>>>>>>> KAFKA-5632. > >>>>>>>> If KIP-159 is adopted and we reduce this KIP to add Headers to > >>>>>>>> RecordContext will be enough, but I'm not sure about the scope > >> of > >>>>>>> KIP-159. > >>>>>>>> If it includes stateful operations will be difficult to > >>> implemented > >>>>> as > >>>>>>>> stated in 2. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Jorge. > >>>>>>>> > >>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > >>>>>>> matth...@confluent.io>) > >>>>>>>> escribió: > >>>>>>>> > >>>>>>>>> Thanks for the KIP Jorge, > >>>>>>>>> > >>>>>>>>> As Bill pointed out already, we should be careful with adding > >>> new > >>>>>>>>> overloads as this contradicts the work done via KIP-182. > >>>>>>>>> > >>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159. Are > >>> you > >>>>>> aware > >>>>>>>>> of them? Both have quite long DISCUSS threads, but it might be > >>>> worth > >>>>>>>>> browsing through them. > >>>>>>>>> > >>>>>>>>> A few further questions: > >>>>>>>>> > >>>>>>>>> - why do you want to add the headers to `KeyValue`? I am not > >>> sure > >>>>> if > >>>>>> we > >>>>>>>>> should consider headers as optional metadata and add it to > >>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only > >>>>>>>> > >>>>>>>> > >>>>>>>>> - You only include stateless single-record transformations at > >>> the > >>>>> DSL > >>>>>>>>> level. Do you suggest that all other operator just drop > >> headers > >>> on > >>>>> the > >>>>>>>>> floor? > >>>>>>>>> > >>>>>>>>> - Why do you only want to put headers into in-memory and > >> cache > >>>> but > >>>>>> not > >>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO, all > >>>> stores > >>>>>>>>> should behave the same at DSL level. > >>>>>>>>> -> if we store the headers in the state stores, what is the > >>>>> upgrade > >>>>>>>>> path? > >>>>>>>>> > >>>>>>>>> - Why do we need to store record header in state in the first > >>>>> place, > >>>>>> if > >>>>>>>>> we exclude stateful operator at DSL level? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> What is the motivation for the "border lines" you choose? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: > >>>>>>>>>> Jorge, > >>>>>>>>>> > >>>>>>>>>> Thanks for the KIP, I know this is a feature others in the > >>>>> community > >>>>>>> have > >>>>>>>>>> been interested in getting into Kafka Streams. > >>>>>>>>>> > >>>>>>>>>> I took a quick pass over it, and I have one initial question. > >>>>>>>>>> > >>>>>>>>>> We recently reduced overloads with KIP-182, and in this KIP > >> we > >>>> are > >>>>>>>>>> increasing them again. > >>>>>>>>>> > >>>>>>>>>> I can see from the KIP why they are necessary, but I'm > >>> wondering > >>>> if > >>>>>>> there > >>>>>>>>>> is something else we can do to cut down on the overloads > >>>>>> introduced. I > >>>>>>>>>> don't have any sound suggestions ATM, so I'll have to think > >>> about > >>>>> it > >>>>>>> some > >>>>>>>>>> more, but I wanted to put the thought out there. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Bill > >>>>>>>>>> > >>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate > >> Otoya < > >>>>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> I have created a KIP to add Record Headers support to Kafka > >>>>> Streams > >>>>>>> API: > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> The main goal is to be able to use headers to filter, map > >> and > >>>>>> process > >>>>>>>>>>> records as streams. Stateful processing (joins, windows) are > >>> not > >>>>>>>>>>> considered. > >>>>>>>>>>> > >>>>>>>>>>> Proposed changes/Draft: > >>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo: > >>>>>> streams-headers > >>>>>>>>>>> > >>>>>>>>>>> Feedback and suggestions are more than welcome. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> > >>>>>>>>>>> Jorge. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > > > > > > > > -- -- Guozhang