Yeah I'm only talking about the DSL part (i.e. how stateful / stateless operators default inheritance protocol would be promised) to be managed with KIP-159.
For allowing users to override the default behavior in PAPI, that would be in a different KIP. Guozhang On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax <matth...@confluent.io> wrote: > I am actually not sure about this. Because it's about the semantics at > PAPI level, but KIP-159 targets the DSL, it might actually be better to > have a separate KIP? > > -Matthias > > On 5/11/18 9:26 AM, Guozhang Wang wrote: > > That's a good question. I think we can manage this in KIP-159. I will go > > ahead and try to augment that KIP together with the original author > Jeyhun. > > > > > > Guozhang > > > > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > >> Thanks Guozhang and Matthias! I do also agree with this way of handling > >> headers inheritance. I will add them to the KIP doc. > >> > >>> We can discuss about extending the current protocol and how to enable > >> users > >>> override those rule, and how to expose them in the DSL layer in a > future > >>> KIP. > >> > >> About this, should this be managed on KIP-159 or a new one? > >> > >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (< > matth...@confluent.io > >>> ) > >> escribió: > >> > >>> Thanks Guozhang! Sounds good to me! > >>> > >>> -Matthias > >>> > >>> On 5/10/18 7:55 AM, Guozhang Wang wrote: > >>>> Thanks for your thoughts Matthias. I think if we do want to bring > >> KIP-244 > >>>> into 2.0 then we need to keep its scope small and well defined. For > >> that > >>>> I'm proposing: > >>>> > >>>> 1. Make the inheritance implementation of headers consistent with what > >> we > >>>> had with other record context fields. I.e. pass through the record > >>> context > >>>> in `context.forward()`. Note that within a processor node, users can > >>>> already manipulate the Headers with the given APIs, so at the time of > >>>> forwarding, the library can just copy what-ever is left / updated to > >> the > >>>> next processor node. > >>>> > >>>> 2. In the sink node, where a record is being sent to the Kafka topic, > >> we > >>>> should consider the following: > >>>> > >>>> a. For sink topics, we will set the headers into the producer record. > >>>> b. For repartition topics, we will the headers into the producer > >> record. > >>>> c. For changelog topics, we will drop the headers in the produce > record > >>>> since they will not be used in restoration and not stored in the state > >>>> store either. > >>>> > >>>> > >>>> We can discuss about extending the current protocol and how to enable > >>> users > >>>> override those rule, and how to expose them in the DSL layer in a > >> future > >>>> KIP. > >>>> > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax < > matth...@confluent.io > >>> > >>>> wrote: > >>>> > >>>>> Guozhang, > >>>>> > >>>>> if you advocate to forward headers by default, it might be a better > >>>>> default strategy do forward the headers for all operators (similar to > >>>>> topic/partition/offset metadata). It's usually harder for users to > >>>>> reason about different cases and thus I would prefer to have > >> consistent > >>>>> behavior, ie, only one default strategy instead of introducing > >> different > >>>>> cases. > >>>>> > >>>>> Btw: My argument about dropping headers by default only implies, that > >>>>> users need to copy the headers explicitly to the output records in > >> there > >>>>> code of they want to inspect them later -- it does not imply that > >>>>> headers cannot be forwarded downstream. (Not sure if this was clear). > >>>>> > >>>>> I am also ok with copying be default thought (for me, it's a 51/49 > >>>>> preference for dropping by default only). > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote: > >>>>>> Hi Matthias, > >>>>>> > >>>>>> My concern of setting `null` in all cases is that it would make > >> headers > >>>>> not > >>>>>> very useful in KIP-244 then, because headers will only be available > >> at > >>>>> the > >>>>>> source stream / table, but not in any of the following instances. In > >>>>>> practice users may be more likely to look into the headers later in > >> the > >>>>>> pipeline. Personally I'd suggest we pass the headers for all > >> stateless > >>>>>> operators in DSL and everywhere in PAPI's context.forward(). For > >>>>>> repartition topics and sink topics, we also set them in the produced > >>>>>> records accordingly; for changelog topics, we do not set them since > >>> they > >>>>>> are not going to be used anywhere in the store. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax < > >> matth...@confluent.io > >>>> > >>>>>> wrote: > >>>>>> > >>>>>>> I agree, that we should not block this KIP if possible. > >> Nevertheless, > >>> we > >>>>>>> should try to get a reasonable default strategy for inheriting the > >>>>>>> headers so we don't need to change it later on. > >>>>>>> > >>>>>>> Let's see what other think. I still tend slightly to set to `null` > >> by > >>>>>>> default for all cases. If the default strategy is different for > >>>>>>> different operators as you suggest, it might be confusion to users. > >>>>>>> IMHO, the default behavior should be as simple as possible. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote: > >>>>>>>> Matthias, thanks for sharing your opinions in the inheritance > >>> protocol > >>>>> of > >>>>>>>> the record context. I'm thinking maybe we should make this > >> discussion > >>>>> as > >>>>>>> a > >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would be > >>> smaller, > >>>>>>> and > >>>>>>>> within KIP-244 we can have a simple inheritance rule that setting > >> it > >>> to > >>>>>>>> null when 1) going through stateful operators and 2) sending to > any > >>>>>>> topics. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax < > >>>>> matth...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Making the inheritance protocol a public contract seems > reasonable > >>> to > >>>>>>> me. > >>>>>>>>> > >>>>>>>>> In the current implementation, all output records inherits the > >>> offset, > >>>>>>>>> timestamp, topic, and partition metadata from the input record. > We > >>>>>>>>> already added an API to change the timestamp explicitly for the > >>> output > >>>>>>>>> record thought. > >>>>>>>>> > >>>>>>>>> I think it make sense to keep the inheritance of offset, topic, > >> and > >>>>>>>>> partition. For headers, it's worth to discuss. I see arguments > for > >>> two > >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by default. > >>>>>>>>> Independent of the default behavior, we should add an API to set > >>>>> headers > >>>>>>>>> for output records explicitly though (similar to the "set > >> timestamp > >>>>>>> API"). > >>>>>>>>> > >>>>>>>>> From my point of view, timestamp/headers are a different > >>>>>>>>> "class/category" of data/metadata than topic/partition/offset. > For > >>> the > >>>>>>>>> first category, it makes sense to manipulate them and it's more > >> than > >>>>>>>>> "plain metadata"; especially the timestamp. For the second > >> category > >>> it > >>>>>>>>> does not make sense to manipulate it, and to me > >>> topic/partition/offset > >>>>>>>>> is pure metadata only---strictly speaking, it's even questionable > >> if > >>>>>>>>> output records should have any value for topic/partition/offset > in > >>> the > >>>>>>>>> first place, or if they should be `null`, because those > attributes > >>> do > >>>>>>>>> only make sense for source records that are consumed from a topic > >>>>>>>>> directly only. On the other hand, if we make this difference > >>> explicit, > >>>>>>>>> it might be useful information for the use to track the current > >>>>>>>>> topic/partition/offset of the original source record. > >>>>>>>>> > >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat > different, > >>>>> too. > >>>>>>>>> For stream processing it's required that every record has a > >>> timestamp; > >>>>>>>>> thus, it make sense to inherit the input record timestamp by > >> default > >>>>> (a > >>>>>>>>> timestamp is not really metadata but actually equally important > to > >>> key > >>>>>>>>> and value from my point of view). Header however are optional, > and > >>>>> thus > >>>>>>>>> inheriting them is not really required. It might be convenient > >>> though: > >>>>>>>>> for example, imagine a simple "filter-only" application -- it > >> would > >>> be > >>>>>>>>> cumbersome for users to explicitly copy the headers from the > input > >>>>>>>>> records to the output records -- it seems to be unnecessary > >>>>> boilerplate > >>>>>>>>> code. On the other hand, for any other more complex use case, > it's > >>>>>>>>> questionable to inherit headers---note, that headers would be > >>> written > >>>>> to > >>>>>>>>> the output topics increasing the size of the messages. Overall, I > >> am > >>>>> not > >>>>>>>>> sure which default strategy might be the better one for headers. > >> Is > >>>>>>>>> there a convincing argument for either one of them? I slightly > >> tend > >>> to > >>>>>>>>> think that using `null` as default might be better. > >>>>>>>>> > >>>>>>>>> Last, we could also make the default behavior configurable. > >>> Something > >>>>>>>>> like `inherit.record.headers=true/false` with default value > >> "false". > >>>>>>>>> This would allow people to opt-in for auto-header-inheritance. > >> Just > >>> an > >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a good > >>> one. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote: > >>>>>>>>>> Hello Jorge, > >>>>>>>>>> > >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > >>>>>>>>> `RecordContext` > >>>>>>>>>> would be handled the same way as other attributes. > >>>>>>>>>> > >>>>>>>>>> Today we do not have a clear inheritance protocol for other > >> fields > >>> of > >>>>>>>>>> RecordContext yet: although internally we do have some criterion > >> on > >>>>>>>>>> topic/partition/offset and timestamp, they are not explicitly > >>> exposed > >>>>>>> to > >>>>>>>>>> users. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I think we still need to have a defined protocol for headers > >>> itself, > >>>>>>> but > >>>>>>>>> I > >>>>>>>>>> agree that it better to be scoped out side of this KIP, since > >> this > >>>>>>>>>> inheritance protocol itself for all the fields of RecordContext > >>> would > >>>>>>>>>> better be a separate KIP. We can document this clearly in the > >> wiki > >>>>>>> page. > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < > >>>>>>>>>> garcia.florian.pe...@gmail.com> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> For me this is a great first step to have Headers in streaming. > >>>>>>>>>>> My current use case is about distributed tracing (Zipkin) and > >> with > >>>>> the > >>>>>>>>>>> headers in the processorContext() I'll be able to manage that > >> for > >>>>> the > >>>>>>>>> most > >>>>>>>>>>> cases. > >>>>>>>>>>> The KIP-159 should follow after this but this is where all the > >>> major > >>>>>>>>>>> questions will arise for stateful operations (as Guozhang > said). > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the work on this Jorge. > >>>>>>>>>>> > >>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya < > >>>>>>>>>>> quilcate.jo...@gmail.com> a écrit : > >>>>>>>>>>> > >>>>>>>>>>>> Thanks Guozhang and John for your feedback. > >>>>>>>>>>>> > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in > >>> our > >>>>>>>>>>>> topology: > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > >>>>>>> straight-forward. > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > >> straight-forward. > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and > >> joins? > >>>>>>>>>>>> > >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > >>>>>>>>>>>> `RecordContext` would be handled the same way as other > >>> attributes. > >>>>>>>>>>>> > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > >>>>>>>>>>>> filter/map/branch", > >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look at that > >>> KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, I will point to it. > >>>>>>>>>>>> > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state > >> store > >>>>>>>>>>>> cache include the headers then in order to be sent > downstreams? > >>>>>>>>>>>> > >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I > >>> thinks > >>>>>>>>> this > >>>>>>>>>>> is > >>>>>>>>>>>> already supported. I will detail this on the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > >> this > >>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>> removed? > >>>>>>>>>>>> > >>>>>>>>>>>> Fixed, thanks. > >>>>>>>>>>>> > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope > >> is > >>>>>>> only > >>>>>>>>>>>> for exposing > >>>>>>>>>>>> the headers for reading, and not allowing users to add / > modify > >>>>>>>>> headers, > >>>>>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed > >>>>> Changes" > >>>>>>>>>>>> section. > >>>>>>>>>>>> > >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and headers > will > >>> be > >>>>>>> send > >>>>>>>>>>>> downstream, it can be mutated (add/remove headers). > >>>>>>>>>>>> > >>>>>>>>>>>> > Also, despite the decreased scope in this KIP, I think it > >>> might > >>>>> be > >>>>>>>>>>>> valuable to define what will happen to headers once this > change > >>> is > >>>>>>>>>>>> implemented. For example, I think a minimal groundwork-level > >>> change > >>>>>>>>> might > >>>>>>>>>>>> be to make the API changes, while promising to drop all > headers > >>>>> from > >>>>>>>>>>> input > >>>>>>>>>>>> records. > >>>>>>>>>>>> > >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and don't > >>> drop > >>>>>>>>> yhrm. > >>>>>>>>>>>> Clients will have to drop `Headers` if they have used them. > >>>>>>>>>>>> Or it could be something like a boolean config property that > >>> manage > >>>>>>>>> this. > >>>>>>>>>>>> I would like to hear feedback here. > >>>>>>>>>>>> > >>>>>>>>>>>>> A maximal groundwork change would be to forward the headers > >>>>> through > >>>>>>>>> all > >>>>>>>>>>>> operators > >>>>>>>>>>>> in > >>>>>>>>>>>> > >>>>>>>>>>>> Streams. But I think there are some unresolved questions about > >>>>>>>>>>> forwarding, > >>>>>>>>>>>> like "what happens to the headers in a join?" > >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented and > >>>>>>> supporting > >>>>>>>>>>>> Headers. > >>>>>>>>>>>> > >>>>>>>>>>>>> There's of course some middle ground, but instinctively, I > >> think > >>>>> I'd > >>>>>>>>>>>> prefer to have a clear definition that headers are currently > >>> *not* > >>>>>>>>>>>> forwarded, rather than having a complex list of operators that > >> do > >>>>> or > >>>>>>>>>>> don't > >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define this > >>>>>>> behavior > >>>>>>>>>>>> while not allowing the scope to return to that of your > original > >>>>>>>>> proposal! > >>>>>>>>>>>> > >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the > >> original > >>>>>>>>>>> proposal. > >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so if it's > >>>>>>> forward > >>>>>>>>> it > >>>>>>>>>>>> or not is as the same as `RecordContext`. > >>>>>>>>>>>> On top of this implementation, we can design how > >> filter/map/join > >>>>> will > >>>>>>>>> be > >>>>>>>>>>>> handled. Probably following KIP-159 approach. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Jorge. > >>>>>>>>>>>> > >>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (< > >>>>> wangg...@gmail.com > >>>>>>>> ) > >>>>>>>>>>>> escribió: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and left some > >>>>>>> comments > >>>>>>>>>>>>> (some of them overlapped with John's): > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in > >>> our > >>>>>>>>>>>> topology: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > >>>>>>> straight-forward. > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > >> straight-forward. > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and > >> joins? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state > >> store > >>>>>>> cache > >>>>>>>>>>>>> include the headers then in order to be sent downstreams? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth > >>>>> taking > >>>>>>> a > >>>>>>>>>>>> look > >>>>>>>>>>>>> at that KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > >> this > >>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>>> removed? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope > >> is > >>>>>>> only > >>>>>>>>>>> for > >>>>>>>>>>>>> exposing the headers for reading, and not allowing users to > >> add > >>> / > >>>>>>>>>>> modify > >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at the > >>>>>>> "Proposed > >>>>>>>>>>>>> Changes" section. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler < > >> j...@confluent.io > >>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the design work. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor API > >> will > >>>>>>> help > >>>>>>>>>>>>>> contain the design and implementation complexity. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be available > >> in > >>>>> the > >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says > >> that > >>>>>>>>>>>>>> implementers would need to implement the method "void > >> process(K > >>>>>>> key, > >>>>>>>>>>> V > >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to remove > >>> the > >>>>>>>>>>>> proposal > >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in > >>>>> conjunction > >>>>>>>>>>> with > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in your > PR. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it > >> might > >>>>> be > >>>>>>>>>>>>> valuable > >>>>>>>>>>>>>> to define what will happen to headers once this change is > >>>>>>>>>>> implemented. > >>>>>>>>>>>>> For > >>>>>>>>>>>>>> example, I think a minimal groundwork-level change might be > >> to > >>>>> make > >>>>>>>>>>> the > >>>>>>>>>>>>> API > >>>>>>>>>>>>>> changes, while promising to drop all headers from input > >>> records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> A maximal groundwork change would be to forward the headers > >>>>> through > >>>>>>>>>>> all > >>>>>>>>>>>>>> operators in Streams. But I think there are some unresolved > >>>>>>> questions > >>>>>>>>>>>>> about > >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a join?" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> There's of course some middle ground, but instinctively, I > >>> think > >>>>>>> I'd > >>>>>>>>>>>>> prefer > >>>>>>>>>>>>>> to have a clear definition that headers are currently *not* > >>>>>>>>>>> forwarded, > >>>>>>>>>>>>>> rather than having a complex list of operators that do or > >> don't > >>>>>>>>>>> forward > >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this > >> behavior > >>>>>>> while > >>>>>>>>>>>> not > >>>>>>>>>>>>>> allowing the scope to return to that of your original > >> proposal! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks again for the KIP, > >>>>>>>>>>>>>> -John > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya > >> < > >>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP and > >>>>> create > >>>>>>> a > >>>>>>>>>>>> PR. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Looking forward to your feedback, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > >>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>> escribió: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some progress. > The > >>>>>>>>>>> tricky > >>>>>>>>>>>>>>>> question of this work, seems to be how to expose headers > at > >>> DSL > >>>>>>>>>>>>> level. > >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for > Processor > >>>>> API, > >>>>>>>>>>> it > >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers to the > >>> API. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header > >>>>> support > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done, we > can > >>> see > >>>>>>>>>>> in > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> second step, how to add headers at DSL level. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> WDYT about this proposal? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly. > >>> Note, > >>>>>>>>>>>> that > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them > >>>>>>>>>>>> accordingly: > >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > >>>>>>>>>>>>>>>>> Thanks for your feedback! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy, > >> but > >>> I > >>>>>>>>>>>> think > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping headers > to > >>>>>>>>>>>>> key/value > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> then group using current KeyValue structure. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as > >>>>>>>>>>>>> structure, > >>>>>>>>>>>>>>>> hence > >>>>>>>>>>>>>>>>> considering headers as part of stateful operations will > >> not > >>>>> fit > >>>>>>>>>>>> in > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think in a > >>> use-case > >>>>>>>>>>>> that > >>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>> this). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not > >> aware > >>>>> of > >>>>>>>>>>>>>> KIP-159 > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>> KAFKA-5632. > >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add > >> Headers > >>> to > >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the > >>> scope > >>>>>>>>>>> of > >>>>>>>>>>>>>>>> KIP-159. > >>>>>>>>>>>>>>>>> If it includes stateful operations will be difficult to > >>>>>>>>>>>> implemented > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> stated in 2. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > >>>>>>>>>>>>>>>> matth...@confluent.io>) > >>>>>>>>>>>>>>>>> escribió: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with > >>> adding > >>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and > KIP-159. > >>> Are > >>>>>>>>>>>> you > >>>>>>>>>>>>>>> aware > >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it > >> might > >>>>> be > >>>>>>>>>>>>> worth > >>>>>>>>>>>>>>>>>> browsing through them. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> A few further questions: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - why do you want to add the headers to `KeyValue`? I > am > >>> not > >>>>>>>>>>>> sure > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and add it > >> to > >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - You only include stateless single-record > >> transformations > >>>>> at > >>>>>>>>>>>> the > >>>>>>>>>>>>>> DSL > >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just drop > >>>>>>>>>>> headers > >>>>>>>>>>>> on > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> floor? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Why do you only want to put headers into in-memory > and > >>>>>>>>>>> cache > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO, > >>> all > >>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>> should behave the same at DSL level. > >>>>>>>>>>>>>>>>>> -> if we store the headers in the state stores, what > >> is > >>>>> the > >>>>>>>>>>>>>> upgrade > >>>>>>>>>>>>>>>>>> path? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Why do we need to store record header in state in the > >>>>> first > >>>>>>>>>>>>>> place, > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you > choose? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: > >>>>>>>>>>>>>>>>>>> Jorge, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in > >> the > >>>>>>>>>>>>>> community > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial > >>>>> question. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this > >>> KIP > >>>>>>>>>>> we > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> increasing them again. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm > >>>>>>>>>>>> wondering > >>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the > overloads > >>>>>>>>>>>>>>> introduced. I > >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to > >>> think > >>>>>>>>>>>> about > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate > >>>>>>>>>>> Otoya < > >>>>>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to > >>> Kafka > >>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>> API: > >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter, > >> map > >>>>>>>>>>> and > >>>>>>>>>>>>>>> process > >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins, > >> windows) > >>>>> are > >>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> considered. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft: > >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo: > >>>>>>>>>>>>>>> streams-headers > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >> > > > > > > > > -- -- Guozhang