That's a good question. I think we can manage this in KIP-159. I will go ahead and try to augment that KIP together with the original author Jeyhun.
Guozhang On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Thanks Guozhang and Matthias! I do also agree with this way of handling > headers inheritance. I will add them to the KIP doc. > > > We can discuss about extending the current protocol and how to enable > users > > override those rule, and how to expose them in the DSL layer in a future > > KIP. > > About this, should this be managed on KIP-159 or a new one? > > El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<matth...@confluent.io > >) > escribió: > > > Thanks Guozhang! Sounds good to me! > > > > -Matthias > > > > On 5/10/18 7:55 AM, Guozhang Wang wrote: > > > Thanks for your thoughts Matthias. I think if we do want to bring > KIP-244 > > > into 2.0 then we need to keep its scope small and well defined. For > that > > > I'm proposing: > > > > > > 1. Make the inheritance implementation of headers consistent with what > we > > > had with other record context fields. I.e. pass through the record > > context > > > in `context.forward()`. Note that within a processor node, users can > > > already manipulate the Headers with the given APIs, so at the time of > > > forwarding, the library can just copy what-ever is left / updated to > the > > > next processor node. > > > > > > 2. In the sink node, where a record is being sent to the Kafka topic, > we > > > should consider the following: > > > > > > a. For sink topics, we will set the headers into the producer record. > > > b. For repartition topics, we will the headers into the producer > record. > > > c. For changelog topics, we will drop the headers in the produce record > > > since they will not be used in restoration and not stored in the state > > > store either. > > > > > > > > > We can discuss about extending the current protocol and how to enable > > users > > > override those rule, and how to expose them in the DSL layer in a > future > > > KIP. > > > > > > > > > > > > Guozhang > > > > > > > > > On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > >> Guozhang, > > >> > > >> if you advocate to forward headers by default, it might be a better > > >> default strategy do forward the headers for all operators (similar to > > >> topic/partition/offset metadata). It's usually harder for users to > > >> reason about different cases and thus I would prefer to have > consistent > > >> behavior, ie, only one default strategy instead of introducing > different > > >> cases. > > >> > > >> Btw: My argument about dropping headers by default only implies, that > > >> users need to copy the headers explicitly to the output records in > there > > >> code of they want to inspect them later -- it does not imply that > > >> headers cannot be forwarded downstream. (Not sure if this was clear). > > >> > > >> I am also ok with copying be default thought (for me, it's a 51/49 > > >> preference for dropping by default only). > > >> > > >> > > >> -Matthias > > >> > > >> On 5/7/18 4:52 PM, Guozhang Wang wrote: > > >>> Hi Matthias, > > >>> > > >>> My concern of setting `null` in all cases is that it would make > headers > > >> not > > >>> very useful in KIP-244 then, because headers will only be available > at > > >> the > > >>> source stream / table, but not in any of the following instances. In > > >>> practice users may be more likely to look into the headers later in > the > > >>> pipeline. Personally I'd suggest we pass the headers for all > stateless > > >>> operators in DSL and everywhere in PAPI's context.forward(). For > > >>> repartition topics and sink topics, we also set them in the produced > > >>> records accordingly; for changelog topics, we do not set them since > > they > > >>> are not going to be used anywhere in the store. > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax < > matth...@confluent.io > > > > > >>> wrote: > > >>> > > >>>> I agree, that we should not block this KIP if possible. > Nevertheless, > > we > > >>>> should try to get a reasonable default strategy for inheriting the > > >>>> headers so we don't need to change it later on. > > >>>> > > >>>> Let's see what other think. I still tend slightly to set to `null` > by > > >>>> default for all cases. If the default strategy is different for > > >>>> different operators as you suggest, it might be confusion to users. > > >>>> IMHO, the default behavior should be as simple as possible. > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> On 5/6/18 8:53 PM, Guozhang Wang wrote: > > >>>>> Matthias, thanks for sharing your opinions in the inheritance > > protocol > > >> of > > >>>>> the record context. I'm thinking maybe we should make this > discussion > > >> as > > >>>> a > > >>>>> separate KIP by itself? If yes, then KIP-244's scope would be > > smaller, > > >>>> and > > >>>>> within KIP-244 we can have a simple inheritance rule that setting > it > > to > > >>>>> null when 1) going through stateful operators and 2) sending to any > > >>>> topics. > > >>>>> > > >>>>> > > >>>>> Guozhang > > >>>>> > > >>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax < > > >> matth...@confluent.io> > > >>>>> wrote: > > >>>>> > > >>>>>> Making the inheritance protocol a public contract seems reasonable > > to > > >>>> me. > > >>>>>> > > >>>>>> In the current implementation, all output records inherits the > > offset, > > >>>>>> timestamp, topic, and partition metadata from the input record. We > > >>>>>> already added an API to change the timestamp explicitly for the > > output > > >>>>>> record thought. > > >>>>>> > > >>>>>> I think it make sense to keep the inheritance of offset, topic, > and > > >>>>>> partition. For headers, it's worth to discuss. I see arguments for > > two > > >>>>>> strategies: (1) inherit by default, (2) set `null` by default. > > >>>>>> Independent of the default behavior, we should add an API to set > > >> headers > > >>>>>> for output records explicitly though (similar to the "set > timestamp > > >>>> API"). > > >>>>>> > > >>>>>> From my point of view, timestamp/headers are a different > > >>>>>> "class/category" of data/metadata than topic/partition/offset. For > > the > > >>>>>> first category, it makes sense to manipulate them and it's more > than > > >>>>>> "plain metadata"; especially the timestamp. For the second > category > > it > > >>>>>> does not make sense to manipulate it, and to me > > topic/partition/offset > > >>>>>> is pure metadata only---strictly speaking, it's even questionable > if > > >>>>>> output records should have any value for topic/partition/offset in > > the > > >>>>>> first place, or if they should be `null`, because those attributes > > do > > >>>>>> only make sense for source records that are consumed from a topic > > >>>>>> directly only. On the other hand, if we make this difference > > explicit, > > >>>>>> it might be useful information for the use to track the current > > >>>>>> topic/partition/offset of the original source record. > > >>>>>> > > >>>>>> Furthermore, to me, timestamps and headers are somewhat different, > > >> too. > > >>>>>> For stream processing it's required that every record has a > > timestamp; > > >>>>>> thus, it make sense to inherit the input record timestamp by > default > > >> (a > > >>>>>> timestamp is not really metadata but actually equally important to > > key > > >>>>>> and value from my point of view). Header however are optional, and > > >> thus > > >>>>>> inheriting them is not really required. It might be convenient > > though: > > >>>>>> for example, imagine a simple "filter-only" application -- it > would > > be > > >>>>>> cumbersome for users to explicitly copy the headers from the input > > >>>>>> records to the output records -- it seems to be unnecessary > > >> boilerplate > > >>>>>> code. On the other hand, for any other more complex use case, it's > > >>>>>> questionable to inherit headers---note, that headers would be > > written > > >> to > > >>>>>> the output topics increasing the size of the messages. Overall, I > am > > >> not > > >>>>>> sure which default strategy might be the better one for headers. > Is > > >>>>>> there a convincing argument for either one of them? I slightly > tend > > to > > >>>>>> think that using `null` as default might be better. > > >>>>>> > > >>>>>> Last, we could also make the default behavior configurable. > > Something > > >>>>>> like `inherit.record.headers=true/false` with default value > "false". > > >>>>>> This would allow people to opt-in for auto-header-inheritance. > Just > > an > > >>>>>> idea I wanted to add to the discussion---not sure if it's a good > > one. > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote: > > >>>>>>> Hello Jorge, > > >>>>>>> > > >>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > > >>>>>> `RecordContext` > > >>>>>>> would be handled the same way as other attributes. > > >>>>>>> > > >>>>>>> Today we do not have a clear inheritance protocol for other > fields > > of > > >>>>>>> RecordContext yet: although internally we do have some criterion > on > > >>>>>>> topic/partition/offset and timestamp, they are not explicitly > > exposed > > >>>> to > > >>>>>>> users. > > >>>>>>> > > >>>>>>> > > >>>>>>> I think we still need to have a defined protocol for headers > > itself, > > >>>> but > > >>>>>> I > > >>>>>>> agree that it better to be scoped out side of this KIP, since > this > > >>>>>>> inheritance protocol itself for all the fields of RecordContext > > would > > >>>>>>> better be a separate KIP. We can document this clearly in the > wiki > > >>>> page. > > >>>>>>> > > >>>>>>> Guozhang > > >>>>>>> > > >>>>>>> > > >>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < > > >>>>>>> garcia.florian.pe...@gmail.com> wrote: > > >>>>>>> > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> For me this is a great first step to have Headers in streaming. > > >>>>>>>> My current use case is about distributed tracing (Zipkin) and > with > > >> the > > >>>>>>>> headers in the processorContext() I'll be able to manage that > for > > >> the > > >>>>>> most > > >>>>>>>> cases. > > >>>>>>>> The KIP-159 should follow after this but this is where all the > > major > > >>>>>>>> questions will arise for stateful operations (as Guozhang said). > > >>>>>>>> > > >>>>>>>> Thanks for the work on this Jorge. > > >>>>>>>> > > >>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya < > > >>>>>>>> quilcate.jo...@gmail.com> a écrit : > > >>>>>>>> > > >>>>>>>>> Thanks Guozhang and John for your feedback. > > >>>>>>>>> > > >>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in > > our > > >>>>>>>>> topology: > > >>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > > >>>> straight-forward. > > >>>>>>>>>> 1.b. In DSL stateless operators, it should be > straight-forward. > > >>>>>>>>>> 1.c. What about in stateful operators like aggregates and > joins? > > >>>>>>>>> > > >>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > > >>>>>>>>> `RecordContext` would be handled the same way as other > > attributes. > > >>>>>>>>> > > >>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > > >>>>>>>>> filter/map/branch", > > >>>>>>>>> it may well be covered in KIP-159; worth taking a look at that > > KIP. > > >>>>>>>>> > > >>>>>>>>> Yes, I will point to it. > > >>>>>>>>> > > >>>>>>>>>> 2. In terms of internal implementations, should the state > store > > >>>>>>>>> cache include the headers then in order to be sent downstreams? > > >>>>>>>>> > > >>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I > > thinks > > >>>>>> this > > >>>>>>>> is > > >>>>>>>>> already supported. I will detail this on the KIP. > > >>>>>>>>> > > >>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > this > > >>>> should > > >>>>>>>> be > > >>>>>>>>> removed? > > >>>>>>>>> > > >>>>>>>>> Fixed, thanks. > > >>>>>>>>> > > >>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope > is > > >>>> only > > >>>>>>>>> for exposing > > >>>>>>>>> the headers for reading, and not allowing users to add / modify > > >>>>>> headers, > > >>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed > > >> Changes" > > >>>>>>>>> section. > > >>>>>>>>> > > >>>>>>>>> As headers is exposed in the `ProcessContext`, and headers will > > be > > >>>> send > > >>>>>>>>> downstream, it can be mutated (add/remove headers). > > >>>>>>>>> > > >>>>>>>>> > Also, despite the decreased scope in this KIP, I think it > > might > > >> be > > >>>>>>>>> valuable to define what will happen to headers once this change > > is > > >>>>>>>>> implemented. For example, I think a minimal groundwork-level > > change > > >>>>>> might > > >>>>>>>>> be to make the API changes, while promising to drop all headers > > >> from > > >>>>>>>> input > > >>>>>>>>> records. > > >>>>>>>>> > > >>>>>>>>> I will suggest to pass headers to downstream nodes, and don't > > drop > > >>>>>> yhrm. > > >>>>>>>>> Clients will have to drop `Headers` if they have used them. > > >>>>>>>>> Or it could be something like a boolean config property that > > manage > > >>>>>> this. > > >>>>>>>>> I would like to hear feedback here. > > >>>>>>>>> > > >>>>>>>>>> A maximal groundwork change would be to forward the headers > > >> through > > >>>>>> all > > >>>>>>>>> operators > > >>>>>>>>> in > > >>>>>>>>> > > >>>>>>>>> Streams. But I think there are some unresolved questions about > > >>>>>>>> forwarding, > > >>>>>>>>> like "what happens to the headers in a join?" > > >>>>>>>>> Probably this would be solve once KIP-159 is implemented and > > >>>> supporting > > >>>>>>>>> Headers. > > >>>>>>>>> > > >>>>>>>>>> There's of course some middle ground, but instinctively, I > think > > >> I'd > > >>>>>>>>> prefer to have a clear definition that headers are currently > > *not* > > >>>>>>>>> forwarded, rather than having a complex list of operators that > do > > >> or > > >>>>>>>> don't > > >>>>>>>>> forward them. Plus, I think it might be tricky to define this > > >>>> behavior > > >>>>>>>>> while not allowing the scope to return to that of your original > > >>>>>> proposal! > > >>>>>>>>> > > >>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the > original > > >>>>>>>> proposal. > > >>>>>>>>> The current one pass it as part of `RecordContext`, so if it's > > >>>> forward > > >>>>>> it > > >>>>>>>>> or not is as the same as `RecordContext`. > > >>>>>>>>> On top of this implementation, we can design how > filter/map/join > > >> will > > >>>>>> be > > >>>>>>>>> handled. Probably following KIP-159 approach. > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> Jorge. > > >>>>>>>>> > > >>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (< > > >> wangg...@gmail.com > > >>>>> ) > > >>>>>>>>> escribió: > > >>>>>>>>> > > >>>>>>>>>> Hi Jorge, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the written KIP! Made a pass over it and left some > > >>>> comments > > >>>>>>>>>> (some of them overlapped with John's): > > >>>>>>>>>> > > >>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in > > our > > >>>>>>>>> topology: > > >>>>>>>>>> > > >>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > > >>>> straight-forward. > > >>>>>>>>>> 1.b. In DSL stateless operators, it should be > straight-forward. > > >>>>>>>>>> 1.c. What about in stateful operators like aggregates and > joins? > > >>>>>>>>>> > > >>>>>>>>>> 2. In terms of internal implementations, should the state > store > > >>>> cache > > >>>>>>>>>> include the headers then in order to be sent downstreams? > > >>>>>>>>>> > > >>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > > >>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth > > >> taking > > >>>> a > > >>>>>>>>> look > > >>>>>>>>>> at that KIP. > > >>>>>>>>>> > > >>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > this > > >>>> should > > >>>>>>>> be > > >>>>>>>>>> removed? > > >>>>>>>>>> > > >>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope > is > > >>>> only > > >>>>>>>> for > > >>>>>>>>>> exposing the headers for reading, and not allowing users to > add > > / > > >>>>>>>> modify > > >>>>>>>>>> headers, right? If yes, we'd better state it clearly at the > > >>>> "Proposed > > >>>>>>>>>> Changes" section. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Guozhang > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler < > j...@confluent.io > > > > > >>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Jorge, > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for the design work. > > >>>>>>>>>>> > > >>>>>>>>>>> I agree that de-scoping the work to just the Processor API > will > > >>>> help > > >>>>>>>>>>> contain the design and implementation complexity. > > >>>>>>>>>>> > > >>>>>>>>>>> In the KIP, it mentions that the headers would be available > in > > >> the > > >>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says > that > > >>>>>>>>>>> implementers would need to implement the method "void > process(K > > >>>> key, > > >>>>>>>> V > > >>>>>>>>>>> value, Headers headers);". I think maybe you meant to remove > > the > > >>>>>>>>> proposal > > >>>>>>>>>>> to modify "process", since it wouldn't be necessary in > > >> conjunction > > >>>>>>>> with > > >>>>>>>>>> the > > >>>>>>>>>>> ProcessorContext change, and it's not represented in your PR. > > >>>>>>>>>>> > > >>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it > might > > >> be > > >>>>>>>>>> valuable > > >>>>>>>>>>> to define what will happen to headers once this change is > > >>>>>>>> implemented. > > >>>>>>>>>> For > > >>>>>>>>>>> example, I think a minimal groundwork-level change might be > to > > >> make > > >>>>>>>> the > > >>>>>>>>>> API > > >>>>>>>>>>> changes, while promising to drop all headers from input > > records. > > >>>>>>>>>>> > > >>>>>>>>>>> A maximal groundwork change would be to forward the headers > > >> through > > >>>>>>>> all > > >>>>>>>>>>> operators in Streams. But I think there are some unresolved > > >>>> questions > > >>>>>>>>>> about > > >>>>>>>>>>> forwarding, like "what happens to the headers in a join?" > > >>>>>>>>>>> > > >>>>>>>>>>> There's of course some middle ground, but instinctively, I > > think > > >>>> I'd > > >>>>>>>>>> prefer > > >>>>>>>>>>> to have a clear definition that headers are currently *not* > > >>>>>>>> forwarded, > > >>>>>>>>>>> rather than having a complex list of operators that do or > don't > > >>>>>>>> forward > > >>>>>>>>>>> them. Plus, I think it might be tricky to define this > behavior > > >>>> while > > >>>>>>>>> not > > >>>>>>>>>>> allowing the scope to return to that of your original > proposal! > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks again for the KIP, > > >>>>>>>>>>> -John > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya > < > > >>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Matthias, > > >>>>>>>>>>>> > > >>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP and > > >> create > > >>>> a > > >>>>>>>>> PR. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Looking forward to your feedback, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Jorge. > > >>>>>>>>>>>> > > >>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > > >>>>>>>>>>> matth...@confluent.io > > >>>>>>>>>>>>> ) > > >>>>>>>>>>>> escribió: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Jorge, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I would like to unblock this KIP to make some progress. The > > >>>>>>>> tricky > > >>>>>>>>>>>>> question of this work, seems to be how to expose headers at > > DSL > > >>>>>>>>>> level. > > >>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for Processor > > >> API, > > >>>>>>>> it > > >>>>>>>>>>>>> seems to be rather straight forward to add headers to the > > API. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header > > >> support > > >>>>>>>>> for > > >>>>>>>>>>>>> Processor API only as a first step. If this is done, we can > > see > > >>>>>>>> in > > >>>>>>>>> a > > >>>>>>>>>>>>> second step, how to add headers at DSL level. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> WDYT about this proposal? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly. > > Note, > > >>>>>>>>> that > > >>>>>>>>>> we > > >>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them > > >>>>>>>>> accordingly: > > >>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > > >>>>>>>>>>>>>> Thanks for your feedback! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy, > but > > I > > >>>>>>>>> think > > >>>>>>>>>>> it > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>> not necessary. It should be enough with mapping headers to > > >>>>>>>>>> key/value > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>> then group using current KeyValue structure. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as > > >>>>>>>>>> structure, > > >>>>>>>>>>>>> hence > > >>>>>>>>>>>>>> considering headers as part of stateful operations will > not > > >> fit > > >>>>>>>>> in > > >>>>>>>>>>> this > > >>>>>>>>>>>>>> approach and increase complexity (I cannot think in a > > use-case > > >>>>>>>>> that > > >>>>>>>>>>>> need > > >>>>>>>>>>>>>> this). > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not > aware > > >> of > > >>>>>>>>>>> KIP-159 > > >>>>>>>>>>>>> or > > >>>>>>>>>>>>>> KAFKA-5632. > > >>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add > Headers > > to > > >>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the > > scope > > >>>>>>>> of > > >>>>>>>>>>>>> KIP-159. > > >>>>>>>>>>>>>> If it includes stateful operations will be difficult to > > >>>>>>>>> implemented > > >>>>>>>>>>> as > > >>>>>>>>>>>>>> stated in 2. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>> Jorge. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > > >>>>>>>>>>>>> matth...@confluent.io>) > > >>>>>>>>>>>>>> escribió: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks for the KIP Jorge, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with > > adding > > >>>>>>>>> new > > >>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159. > > Are > > >>>>>>>>> you > > >>>>>>>>>>>> aware > > >>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it > might > > >> be > > >>>>>>>>>> worth > > >>>>>>>>>>>>>>> browsing through them. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> A few further questions: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - why do you want to add the headers to `KeyValue`? I am > > not > > >>>>>>>>> sure > > >>>>>>>>>>> if > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>> should consider headers as optional metadata and add it > to > > >>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - You only include stateless single-record > transformations > > >> at > > >>>>>>>>> the > > >>>>>>>>>>> DSL > > >>>>>>>>>>>>>>> level. Do you suggest that all other operator just drop > > >>>>>>>> headers > > >>>>>>>>> on > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>> floor? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - Why do you only want to put headers into in-memory and > > >>>>>>>> cache > > >>>>>>>>>> but > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO, > > all > > >>>>>>>>>> stores > > >>>>>>>>>>>>>>> should behave the same at DSL level. > > >>>>>>>>>>>>>>> -> if we store the headers in the state stores, what > is > > >> the > > >>>>>>>>>>> upgrade > > >>>>>>>>>>>>>>> path? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - Why do we need to store record header in state in the > > >> first > > >>>>>>>>>>> place, > > >>>>>>>>>>>> if > > >>>>>>>>>>>>>>> we exclude stateful operator at DSL level? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What is the motivation for the "border lines" you choose? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: > > >>>>>>>>>>>>>>>> Jorge, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in > the > > >>>>>>>>>>> community > > >>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>> been interested in getting into Kafka Streams. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial > > >> question. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this > > KIP > > >>>>>>>> we > > >>>>>>>>>> are > > >>>>>>>>>>>>>>>> increasing them again. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm > > >>>>>>>>> wondering > > >>>>>>>>>> if > > >>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>> is something else we can do to cut down on the overloads > > >>>>>>>>>>>> introduced. I > > >>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to > > think > > >>>>>>>>> about > > >>>>>>>>>>> it > > >>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>> more, but I wanted to put the thought out there. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> Bill > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate > > >>>>>>>> Otoya < > > >>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to > > Kafka > > >>>>>>>>>>> Streams > > >>>>>>>>>>>>> API: > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter, > map > > >>>>>>>> and > > >>>>>>>>>>>> process > > >>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins, > windows) > > >> are > > >>>>>>>>> not > > >>>>>>>>>>>>>>>>> considered. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Proposed changes/Draft: > > >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo: > > >>>>>>>>>>>> streams-headers > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Jorge. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -- > > >>>>>>>>>> -- Guozhang > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >>> > > >> > > >> > > > > > > > > > > > -- -- Guozhang