Guozhang, if you advocate to forward headers by default, it might be a better default strategy do forward the headers for all operators (similar to topic/partition/offset metadata). It's usually harder for users to reason about different cases and thus I would prefer to have consistent behavior, ie, only one default strategy instead of introducing different cases.
Btw: My argument about dropping headers by default only implies, that users need to copy the headers explicitly to the output records in there code of they want to inspect them later -- it does not imply that headers cannot be forwarded downstream. (Not sure if this was clear). I am also ok with copying be default thought (for me, it's a 51/49 preference for dropping by default only). -Matthias On 5/7/18 4:52 PM, Guozhang Wang wrote: > Hi Matthias, > > My concern of setting `null` in all cases is that it would make headers not > very useful in KIP-244 then, because headers will only be available at the > source stream / table, but not in any of the following instances. In > practice users may be more likely to look into the headers later in the > pipeline. Personally I'd suggest we pass the headers for all stateless > operators in DSL and everywhere in PAPI's context.forward(). For > repartition topics and sink topics, we also set them in the produced > records accordingly; for changelog topics, we do not set them since they > are not going to be used anywhere in the store. > > > Guozhang > > > On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> I agree, that we should not block this KIP if possible. Nevertheless, we >> should try to get a reasonable default strategy for inheriting the >> headers so we don't need to change it later on. >> >> Let's see what other think. I still tend slightly to set to `null` by >> default for all cases. If the default strategy is different for >> different operators as you suggest, it might be confusion to users. >> IMHO, the default behavior should be as simple as possible. >> >> >> -Matthias >> >> >> On 5/6/18 8:53 PM, Guozhang Wang wrote: >>> Matthias, thanks for sharing your opinions in the inheritance protocol of >>> the record context. I'm thinking maybe we should make this discussion as >> a >>> separate KIP by itself? If yes, then KIP-244's scope would be smaller, >> and >>> within KIP-244 we can have a simple inheritance rule that setting it to >>> null when 1) going through stateful operators and 2) sending to any >> topics. >>> >>> >>> Guozhang >>> >>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Making the inheritance protocol a public contract seems reasonable to >> me. >>>> >>>> In the current implementation, all output records inherits the offset, >>>> timestamp, topic, and partition metadata from the input record. We >>>> already added an API to change the timestamp explicitly for the output >>>> record thought. >>>> >>>> I think it make sense to keep the inheritance of offset, topic, and >>>> partition. For headers, it's worth to discuss. I see arguments for two >>>> strategies: (1) inherit by default, (2) set `null` by default. >>>> Independent of the default behavior, we should add an API to set headers >>>> for output records explicitly though (similar to the "set timestamp >> API"). >>>> >>>> From my point of view, timestamp/headers are a different >>>> "class/category" of data/metadata than topic/partition/offset. For the >>>> first category, it makes sense to manipulate them and it's more than >>>> "plain metadata"; especially the timestamp. For the second category it >>>> does not make sense to manipulate it, and to me topic/partition/offset >>>> is pure metadata only---strictly speaking, it's even questionable if >>>> output records should have any value for topic/partition/offset in the >>>> first place, or if they should be `null`, because those attributes do >>>> only make sense for source records that are consumed from a topic >>>> directly only. On the other hand, if we make this difference explicit, >>>> it might be useful information for the use to track the current >>>> topic/partition/offset of the original source record. >>>> >>>> Furthermore, to me, timestamps and headers are somewhat different, too. >>>> For stream processing it's required that every record has a timestamp; >>>> thus, it make sense to inherit the input record timestamp by default (a >>>> timestamp is not really metadata but actually equally important to key >>>> and value from my point of view). Header however are optional, and thus >>>> inheriting them is not really required. It might be convenient though: >>>> for example, imagine a simple "filter-only" application -- it would be >>>> cumbersome for users to explicitly copy the headers from the input >>>> records to the output records -- it seems to be unnecessary boilerplate >>>> code. On the other hand, for any other more complex use case, it's >>>> questionable to inherit headers---note, that headers would be written to >>>> the output topics increasing the size of the messages. Overall, I am not >>>> sure which default strategy might be the better one for headers. Is >>>> there a convincing argument for either one of them? I slightly tend to >>>> think that using `null` as default might be better. >>>> >>>> Last, we could also make the default behavior configurable. Something >>>> like `inherit.record.headers=true/false` with default value "false". >>>> This would allow people to opt-in for auto-header-inheritance. Just an >>>> idea I wanted to add to the discussion---not sure if it's a good one. >>>> >>>> >>>> -Matthias >>>> >>>> On 5/4/18 3:13 PM, Guozhang Wang wrote: >>>>> Hello Jorge, >>>>> >>>>>> Agree. Probably point 3 handles this. `Headers` been part of >>>> `RecordContext` >>>>> would be handled the same way as other attributes. >>>>> >>>>> Today we do not have a clear inheritance protocol for other fields of >>>>> RecordContext yet: although internally we do have some criterion on >>>>> topic/partition/offset and timestamp, they are not explicitly exposed >> to >>>>> users. >>>>> >>>>> >>>>> I think we still need to have a defined protocol for headers itself, >> but >>>> I >>>>> agree that it better to be scoped out side of this KIP, since this >>>>> inheritance protocol itself for all the fields of RecordContext would >>>>> better be a separate KIP. We can document this clearly in the wiki >> page. >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < >>>>> garcia.florian.pe...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> For me this is a great first step to have Headers in streaming. >>>>>> My current use case is about distributed tracing (Zipkin) and with the >>>>>> headers in the processorContext() I'll be able to manage that for the >>>> most >>>>>> cases. >>>>>> The KIP-159 should follow after this but this is where all the major >>>>>> questions will arise for stateful operations (as Guozhang said). >>>>>> >>>>>> Thanks for the work on this Jorge. >>>>>> >>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya < >>>>>> quilcate.jo...@gmail.com> a écrit : >>>>>> >>>>>>> Thanks Guozhang and John for your feedback. >>>>>>> >>>>>>>> 1. We need to have a clear inheritance protocol of headers in our >>>>>>> topology: >>>>>>>> 1.a. In PAPI's context.forward() call, it should be >> straight-forward. >>>>>>>> 1.b. In DSL stateless operators, it should be straight-forward. >>>>>>>> 1.c. What about in stateful operators like aggregates and joins? >>>>>>> >>>>>>> Agree. Probably point 3 handles this. `Headers` been part of >>>>>>> `RecordContext` would be handled the same way as other attributes. >>>>>>> >>>>>>>> 3. In future work "Adding DSL Processors to use Headers to >>>>>>> filter/map/branch", >>>>>>> it may well be covered in KIP-159; worth taking a look at that KIP. >>>>>>> >>>>>>> Yes, I will point to it. >>>>>>> >>>>>>>> 2. In terms of internal implementations, should the state store >>>>>>> cache include the headers then in order to be sent downstreams? >>>>>>> >>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks >>>> this >>>>>> is >>>>>>> already supported. I will detail this on the KIP. >>>>>>> >>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this >> should >>>>>> be >>>>>>> removed? >>>>>>> >>>>>>> Fixed, thanks. >>>>>>> >>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is >> only >>>>>>> for exposing >>>>>>> the headers for reading, and not allowing users to add / modify >>>> headers, >>>>>>> right? If yes, we'd better state it clearly at the "Proposed Changes" >>>>>>> section. >>>>>>> >>>>>>> As headers is exposed in the `ProcessContext`, and headers will be >> send >>>>>>> downstream, it can be mutated (add/remove headers). >>>>>>> >>>>>>> > Also, despite the decreased scope in this KIP, I think it might be >>>>>>> valuable to define what will happen to headers once this change is >>>>>>> implemented. For example, I think a minimal groundwork-level change >>>> might >>>>>>> be to make the API changes, while promising to drop all headers from >>>>>> input >>>>>>> records. >>>>>>> >>>>>>> I will suggest to pass headers to downstream nodes, and don't drop >>>> yhrm. >>>>>>> Clients will have to drop `Headers` if they have used them. >>>>>>> Or it could be something like a boolean config property that manage >>>> this. >>>>>>> I would like to hear feedback here. >>>>>>> >>>>>>>> A maximal groundwork change would be to forward the headers through >>>> all >>>>>>> operators >>>>>>> in >>>>>>> >>>>>>> Streams. But I think there are some unresolved questions about >>>>>> forwarding, >>>>>>> like "what happens to the headers in a join?" >>>>>>> Probably this would be solve once KIP-159 is implemented and >> supporting >>>>>>> Headers. >>>>>>> >>>>>>>> There's of course some middle ground, but instinctively, I think I'd >>>>>>> prefer to have a clear definition that headers are currently *not* >>>>>>> forwarded, rather than having a complex list of operators that do or >>>>>> don't >>>>>>> forward them. Plus, I think it might be tricky to define this >> behavior >>>>>>> while not allowing the scope to return to that of your original >>>> proposal! >>>>>>> >>>>>>> Agree. But `Headers` were forwarded *explicitly* in the original >>>>>> proposal. >>>>>>> The current one pass it as part of `RecordContext`, so if it's >> forward >>>> it >>>>>>> or not is as the same as `RecordContext`. >>>>>>> On top of this implementation, we can design how filter/map/join will >>>> be >>>>>>> handled. Probably following KIP-159 approach. >>>>>>> >>>>>>> Cheers, >>>>>>> Jorge. >>>>>>> >>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com >>> ) >>>>>>> escribió: >>>>>>> >>>>>>>> Hi Jorge, >>>>>>>> >>>>>>>> Thanks for the written KIP! Made a pass over it and left some >> comments >>>>>>>> (some of them overlapped with John's): >>>>>>>> >>>>>>>> 1. We need to have a clear inheritance protocol of headers in our >>>>>>> topology: >>>>>>>> >>>>>>>> 1.a. In PAPI's context.forward() call, it should be >> straight-forward. >>>>>>>> 1.b. In DSL stateless operators, it should be straight-forward. >>>>>>>> 1.c. What about in stateful operators like aggregates and joins? >>>>>>>> >>>>>>>> 2. In terms of internal implementations, should the state store >> cache >>>>>>>> include the headers then in order to be sent downstreams? >>>>>>>> >>>>>>>> 3. In future work "Adding DSL Processors to use Headers to >>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth taking >> a >>>>>>> look >>>>>>>> at that KIP. >>>>>>>> >>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this >> should >>>>>> be >>>>>>>> removed? >>>>>>>> >>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is >> only >>>>>> for >>>>>>>> exposing the headers for reading, and not allowing users to add / >>>>>> modify >>>>>>>> headers, right? If yes, we'd better state it clearly at the >> "Proposed >>>>>>>> Changes" section. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Jorge, >>>>>>>>> >>>>>>>>> Thanks for the design work. >>>>>>>>> >>>>>>>>> I agree that de-scoping the work to just the Processor API will >> help >>>>>>>>> contain the design and implementation complexity. >>>>>>>>> >>>>>>>>> In the KIP, it mentions that the headers would be available in the >>>>>>>>> ProcessorContext, (like "context.headers()"). It also says that >>>>>>>>> implementers would need to implement the method "void process(K >> key, >>>>>> V >>>>>>>>> value, Headers headers);". I think maybe you meant to remove the >>>>>>> proposal >>>>>>>>> to modify "process", since it wouldn't be necessary in conjunction >>>>>> with >>>>>>>> the >>>>>>>>> ProcessorContext change, and it's not represented in your PR. >>>>>>>>> >>>>>>>>> Also, despite the decreased scope in this KIP, I think it might be >>>>>>>> valuable >>>>>>>>> to define what will happen to headers once this change is >>>>>> implemented. >>>>>>>> For >>>>>>>>> example, I think a minimal groundwork-level change might be to make >>>>>> the >>>>>>>> API >>>>>>>>> changes, while promising to drop all headers from input records. >>>>>>>>> >>>>>>>>> A maximal groundwork change would be to forward the headers through >>>>>> all >>>>>>>>> operators in Streams. But I think there are some unresolved >> questions >>>>>>>> about >>>>>>>>> forwarding, like "what happens to the headers in a join?" >>>>>>>>> >>>>>>>>> There's of course some middle ground, but instinctively, I think >> I'd >>>>>>>> prefer >>>>>>>>> to have a clear definition that headers are currently *not* >>>>>> forwarded, >>>>>>>>> rather than having a complex list of operators that do or don't >>>>>> forward >>>>>>>>> them. Plus, I think it might be tricky to define this behavior >> while >>>>>>> not >>>>>>>>> allowing the scope to return to that of your original proposal! >>>>>>>>> >>>>>>>>> Thanks again for the KIP, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya < >>>>>>>>> quilcate.jo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Matthias, >>>>>>>>>> >>>>>>>>>> I've created a new JIRA to track this, updated the KIP and create >> a >>>>>>> PR. >>>>>>>>>> >>>>>>>>>> Looking forward to your feedback, >>>>>>>>>> >>>>>>>>>> Jorge. >>>>>>>>>> >>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< >>>>>>>>> matth...@confluent.io >>>>>>>>>>> ) >>>>>>>>>> escribió: >>>>>>>>>> >>>>>>>>>>> Hi Jorge, >>>>>>>>>>> >>>>>>>>>>> I would like to unblock this KIP to make some progress. The >>>>>> tricky >>>>>>>>>>> question of this work, seems to be how to expose headers at DSL >>>>>>>> level. >>>>>>>>>>> This related to KIP-149 and KIP-159. However, for Processor API, >>>>>> it >>>>>>>>>>> seems to be rather straight forward to add headers to the API. >>>>>>>>>>> >>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header support >>>>>>> for >>>>>>>>>>> Processor API only as a first step. If this is done, we can see >>>>>> in >>>>>>> a >>>>>>>>>>> second step, how to add headers at DSL level. >>>>>>>>>>> >>>>>>>>>>> WDYT about this proposal? >>>>>>>>>>> >>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly. Note, >>>>>>> that >>>>>>>> we >>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them >>>>>>> accordingly: >>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: >>>>>>>>>>>> Thanks for your feedback! >>>>>>>>>>>> >>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy, but I >>>>>>> think >>>>>>>>> it >>>>>>>>>> is >>>>>>>>>>>> not necessary. It should be enough with mapping headers to >>>>>>>> key/value >>>>>>>>>> and >>>>>>>>>>>> then group using current KeyValue structure. >>>>>>>>>>>> >>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as >>>>>>>> structure, >>>>>>>>>>> hence >>>>>>>>>>>> considering headers as part of stateful operations will not fit >>>>>>> in >>>>>>>>> this >>>>>>>>>>>> approach and increase complexity (I cannot think in a use-case >>>>>>> that >>>>>>>>>> need >>>>>>>>>>>> this). >>>>>>>>>>>> >>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue. >>>>>>>>>>>> >>>>>>>>>>>> Probably I rush a bit proposing this change, I was not aware of >>>>>>>>> KIP-159 >>>>>>>>>>> or >>>>>>>>>>>> KAFKA-5632. >>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add Headers to >>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the scope >>>>>> of >>>>>>>>>>> KIP-159. >>>>>>>>>>>> If it includes stateful operations will be difficult to >>>>>>> implemented >>>>>>>>> as >>>>>>>>>>>> stated in 2. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Jorge. >>>>>>>>>>>> >>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< >>>>>>>>>>> matth...@confluent.io>) >>>>>>>>>>>> escribió: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the KIP Jorge, >>>>>>>>>>>>> >>>>>>>>>>>>> As Bill pointed out already, we should be careful with adding >>>>>>> new >>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182. >>>>>>>>>>>>> >>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159. Are >>>>>>> you >>>>>>>>>> aware >>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it might be >>>>>>>> worth >>>>>>>>>>>>> browsing through them. >>>>>>>>>>>>> >>>>>>>>>>>>> A few further questions: >>>>>>>>>>>>> >>>>>>>>>>>>> - why do you want to add the headers to `KeyValue`? I am not >>>>>>> sure >>>>>>>>> if >>>>>>>>>> we >>>>>>>>>>>>> should consider headers as optional metadata and add it to >>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> - You only include stateless single-record transformations at >>>>>>> the >>>>>>>>> DSL >>>>>>>>>>>>> level. Do you suggest that all other operator just drop >>>>>> headers >>>>>>> on >>>>>>>>> the >>>>>>>>>>>>> floor? >>>>>>>>>>>>> >>>>>>>>>>>>> - Why do you only want to put headers into in-memory and >>>>>> cache >>>>>>>> but >>>>>>>>>> not >>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO, all >>>>>>>> stores >>>>>>>>>>>>> should behave the same at DSL level. >>>>>>>>>>>>> -> if we store the headers in the state stores, what is the >>>>>>>>> upgrade >>>>>>>>>>>>> path? >>>>>>>>>>>>> >>>>>>>>>>>>> - Why do we need to store record header in state in the first >>>>>>>>> place, >>>>>>>>>> if >>>>>>>>>>>>> we exclude stateful operator at DSL level? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> What is the motivation for the "border lines" you choose? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: >>>>>>>>>>>>>> Jorge, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in the >>>>>>>>> community >>>>>>>>>>> have >>>>>>>>>>>>>> been interested in getting into Kafka Streams. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I took a quick pass over it, and I have one initial question. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this KIP >>>>>> we >>>>>>>> are >>>>>>>>>>>>>> increasing them again. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm >>>>>>> wondering >>>>>>>> if >>>>>>>>>>> there >>>>>>>>>>>>>> is something else we can do to cut down on the overloads >>>>>>>>>> introduced. I >>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to think >>>>>>> about >>>>>>>>> it >>>>>>>>>>> some >>>>>>>>>>>>>> more, but I wanted to put the thought out there. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Bill >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate >>>>>> Otoya < >>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to Kafka >>>>>>>>> Streams >>>>>>>>>>> API: >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The main goal is to be able to use headers to filter, map >>>>>> and >>>>>>>>>> process >>>>>>>>>>>>>>> records as streams. Stateful processing (joins, windows) are >>>>>>> not >>>>>>>>>>>>>>> considered. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Proposed changes/Draft: >>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo: >>>>>>>>>> streams-headers >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Feedback and suggestions are more than welcome. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jorge. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > >
signature.asc
Description: OpenPGP digital signature