That's a good question. I think we can manage this in KIP-159. I will go
ahead and try to augment that KIP together with the original author Jeyhun.


Guozhang

On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks Guozhang and Matthias! I do also agree with this way of handling
> headers inheritance. I will add them to the KIP doc.
>
> > We can discuss about extending the current protocol and how to enable
> users
> > override those rule, and how to expose them in the DSL layer in a future
> > KIP.
>
> About this, should this be managed on KIP-159 or a new one?
>
> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<matth...@confluent.io
> >)
> escribió:
>
> > Thanks Guozhang! Sounds good to me!
> >
> > -Matthias
> >
> > On 5/10/18 7:55 AM, Guozhang Wang wrote:
> > > Thanks for your thoughts Matthias. I think if we do want to bring
> KIP-244
> > > into 2.0 then we need to keep its scope small and well defined. For
> that
> > > I'm proposing:
> > >
> > > 1. Make the inheritance implementation of headers consistent with what
> we
> > > had with other record context fields. I.e. pass through the record
> > context
> > > in `context.forward()`. Note that within a processor node, users can
> > > already manipulate the Headers with the given APIs, so at the time of
> > > forwarding, the library can just copy what-ever is left / updated to
> the
> > > next processor node.
> > >
> > > 2. In the sink node, where a record is being sent to the Kafka topic,
> we
> > > should consider the following:
> > >
> > > a. For sink topics, we will set the headers into the producer record.
> > > b. For repartition topics, we will the headers into the producer
> record.
> > > c. For changelog topics, we will drop the headers in the produce record
> > > since they will not be used in restoration and not stored in the state
> > > store either.
> > >
> > >
> > > We can discuss about extending the current protocol and how to enable
> > users
> > > override those rule, and how to expose them in the DSL layer in a
> future
> > > KIP.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > >> Guozhang,
> > >>
> > >> if you advocate to forward headers by default, it might be a better
> > >> default strategy do forward the headers for all operators (similar to
> > >> topic/partition/offset metadata). It's usually harder for users to
> > >> reason about different cases and thus I would prefer to have
> consistent
> > >> behavior, ie, only one default strategy instead of introducing
> different
> > >> cases.
> > >>
> > >> Btw: My argument about dropping headers by default only implies, that
> > >> users need to copy the headers explicitly to the output records in
> there
> > >> code of they want to inspect them later -- it does not imply that
> > >> headers cannot be forwarded downstream. (Not sure if this was clear).
> > >>
> > >> I am also ok with copying be default thought (for me, it's a 51/49
> > >> preference for dropping by default only).
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 5/7/18 4:52 PM, Guozhang Wang wrote:
> > >>> Hi Matthias,
> > >>>
> > >>> My concern of setting `null` in all cases is that it would make
> headers
> > >> not
> > >>> very useful in KIP-244 then, because headers will only be available
> at
> > >> the
> > >>> source stream / table, but not in any of the following instances. In
> > >>> practice users may be more likely to look into the headers later in
> the
> > >>> pipeline. Personally I'd suggest we pass the headers for all
> stateless
> > >>> operators in DSL and everywhere in PAPI's context.forward(). For
> > >>> repartition topics and sink topics, we also set them in the produced
> > >>> records accordingly; for changelog topics, we do not set them since
> > they
> > >>> are not going to be used anywhere in the store.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> I agree, that we should not block this KIP if possible.
> Nevertheless,
> > we
> > >>>> should try to get a reasonable default strategy for inheriting the
> > >>>> headers so we don't need to change it later on.
> > >>>>
> > >>>> Let's see what other think. I still tend slightly to set to `null`
> by
> > >>>> default for all cases. If the default strategy is different for
> > >>>> different operators as you suggest, it might be confusion to users.
> > >>>> IMHO, the default behavior should be as simple as possible.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>> On 5/6/18 8:53 PM, Guozhang Wang wrote:
> > >>>>> Matthias, thanks for sharing your opinions in the inheritance
> > protocol
> > >> of
> > >>>>> the record context. I'm thinking maybe we should make this
> discussion
> > >> as
> > >>>> a
> > >>>>> separate KIP by itself? If yes, then KIP-244's scope would be
> > smaller,
> > >>>> and
> > >>>>> within KIP-244 we can have a simple inheritance rule that setting
> it
> > to
> > >>>>> null when 1) going through stateful operators and 2) sending to any
> > >>>> topics.
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <
> > >> matth...@confluent.io>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Making the inheritance protocol a public contract seems reasonable
> > to
> > >>>> me.
> > >>>>>>
> > >>>>>> In the current implementation, all output records inherits the
> > offset,
> > >>>>>> timestamp, topic, and partition metadata from the input record. We
> > >>>>>> already added an API to change the timestamp explicitly for the
> > output
> > >>>>>> record thought.
> > >>>>>>
> > >>>>>> I think it make sense to keep the inheritance of offset, topic,
> and
> > >>>>>> partition. For headers, it's worth to discuss. I see arguments for
> > two
> > >>>>>> strategies: (1) inherit by default, (2) set `null` by default.
> > >>>>>> Independent of the default behavior, we should add an API to set
> > >> headers
> > >>>>>> for output records explicitly though (similar to the "set
> timestamp
> > >>>> API").
> > >>>>>>
> > >>>>>> From my point of view, timestamp/headers are a different
> > >>>>>> "class/category" of data/metadata than topic/partition/offset. For
> > the
> > >>>>>> first category, it makes sense to manipulate them and it's more
> than
> > >>>>>> "plain metadata"; especially the timestamp. For the second
> category
> > it
> > >>>>>> does not make sense to manipulate it, and to me
> > topic/partition/offset
> > >>>>>> is pure metadata only---strictly speaking, it's even questionable
> if
> > >>>>>> output records should have any value for topic/partition/offset in
> > the
> > >>>>>> first place, or if they should be `null`, because those attributes
> > do
> > >>>>>> only make sense for source records that are consumed from a topic
> > >>>>>> directly only. On the other hand, if we make this difference
> > explicit,
> > >>>>>> it might be useful information for the use to track the current
> > >>>>>> topic/partition/offset of the original source record.
> > >>>>>>
> > >>>>>> Furthermore, to me, timestamps and headers are somewhat different,
> > >> too.
> > >>>>>> For stream processing it's required that every record has a
> > timestamp;
> > >>>>>> thus, it make sense to inherit the input record timestamp by
> default
> > >> (a
> > >>>>>> timestamp is not really metadata but actually equally important to
> > key
> > >>>>>> and value from my point of view). Header however are optional, and
> > >> thus
> > >>>>>> inheriting them is not really required. It might be convenient
> > though:
> > >>>>>> for example, imagine a simple "filter-only" application -- it
> would
> > be
> > >>>>>> cumbersome for users to explicitly copy the headers from the input
> > >>>>>> records to the output records -- it seems to be unnecessary
> > >> boilerplate
> > >>>>>> code. On the other hand, for any other more complex use case, it's
> > >>>>>> questionable to inherit headers---note, that headers would be
> > written
> > >> to
> > >>>>>> the output topics increasing the size of the messages. Overall, I
> am
> > >> not
> > >>>>>> sure which default strategy might be the better one for headers.
> Is
> > >>>>>> there a convincing argument for either one of them? I slightly
> tend
> > to
> > >>>>>> think that using `null` as default might be better.
> > >>>>>>
> > >>>>>> Last, we could also make the default behavior configurable.
> > Something
> > >>>>>> like `inherit.record.headers=true/false` with default value
> "false".
> > >>>>>> This would allow people to opt-in for auto-header-inheritance.
> Just
> > an
> > >>>>>> idea I wanted to add to the discussion---not sure if it's a good
> > one.
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
> > >>>>>>> Hello Jorge,
> > >>>>>>>
> > >>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
> > >>>>>> `RecordContext`
> > >>>>>>> would be handled the same way as other attributes.
> > >>>>>>>
> > >>>>>>> Today we do not have a clear inheritance protocol for other
> fields
> > of
> > >>>>>>> RecordContext yet: although internally we do have some criterion
> on
> > >>>>>>> topic/partition/offset and timestamp, they are not explicitly
> > exposed
> > >>>> to
> > >>>>>>> users.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> I think we still need to have a defined protocol for headers
> > itself,
> > >>>> but
> > >>>>>> I
> > >>>>>>> agree that it better to be scoped out side of this KIP, since
> this
> > >>>>>>> inheritance protocol itself for all the fields of RecordContext
> > would
> > >>>>>>> better be a separate KIP. We can document this clearly in the
> wiki
> > >>>> page.
> > >>>>>>>
> > >>>>>>> Guozhang
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> > >>>>>>> garcia.florian.pe...@gmail.com> wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> For me this is a great first step to have Headers in streaming.
> > >>>>>>>> My current use case is about distributed tracing (Zipkin) and
> with
> > >> the
> > >>>>>>>> headers in the processorContext() I'll be able to manage that
> for
> > >> the
> > >>>>>> most
> > >>>>>>>> cases.
> > >>>>>>>> The KIP-159 should follow after this but this is where all the
> > major
> > >>>>>>>> questions will arise for stateful operations (as Guozhang said).
> > >>>>>>>>
> > >>>>>>>> Thanks for the work on this Jorge.
> > >>>>>>>>
> > >>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
> > >>>>>>>> quilcate.jo...@gmail.com> a écrit :
> > >>>>>>>>
> > >>>>>>>>> Thanks Guozhang and John for your feedback.
> > >>>>>>>>>
> > >>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
> > our
> > >>>>>>>>> topology:
> > >>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > >>>> straight-forward.
> > >>>>>>>>>> 1.b. In DSL stateless operators, it should be
> straight-forward.
> > >>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> joins?
> > >>>>>>>>>
> > >>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
> > >>>>>>>>> `RecordContext` would be handled the same way as other
> > attributes.
> > >>>>>>>>>
> > >>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
> > >>>>>>>>> filter/map/branch",
> > >>>>>>>>> it may well be covered in KIP-159; worth taking a look at that
> > KIP.
> > >>>>>>>>>
> > >>>>>>>>> Yes, I will point to it.
> > >>>>>>>>>
> > >>>>>>>>>> 2. In terms of internal implementations, should the state
> store
> > >>>>>>>>> cache include the headers then in order to be sent downstreams?
> > >>>>>>>>>
> > >>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I
> > thinks
> > >>>>>> this
> > >>>>>>>> is
> > >>>>>>>>> already supported. I will detail this on the KIP.
> > >>>>>>>>>
> > >>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
> this
> > >>>> should
> > >>>>>>>> be
> > >>>>>>>>> removed?
> > >>>>>>>>>
> > >>>>>>>>> Fixed, thanks.
> > >>>>>>>>>
> > >>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
> is
> > >>>> only
> > >>>>>>>>> for exposing
> > >>>>>>>>> the headers for reading, and not allowing users to add / modify
> > >>>>>> headers,
> > >>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed
> > >> Changes"
> > >>>>>>>>> section.
> > >>>>>>>>>
> > >>>>>>>>> As headers is exposed in the `ProcessContext`, and headers will
> > be
> > >>>> send
> > >>>>>>>>> downstream, it can be mutated (add/remove headers).
> > >>>>>>>>>
> > >>>>>>>>>  > Also, despite the decreased scope in this KIP, I think it
> > might
> > >> be
> > >>>>>>>>> valuable to define what will happen to headers once this change
> > is
> > >>>>>>>>> implemented. For example, I think a minimal groundwork-level
> > change
> > >>>>>> might
> > >>>>>>>>> be to make the API changes, while promising to drop all headers
> > >> from
> > >>>>>>>> input
> > >>>>>>>>> records.
> > >>>>>>>>>
> > >>>>>>>>> I will suggest to pass headers to downstream nodes, and don't
> > drop
> > >>>>>> yhrm.
> > >>>>>>>>> Clients will have to drop `Headers` if they have used them.
> > >>>>>>>>> Or it could be something like a boolean config property that
> > manage
> > >>>>>> this.
> > >>>>>>>>> I would like to hear feedback here.
> > >>>>>>>>>
> > >>>>>>>>>> A maximal groundwork change would be to forward the headers
> > >> through
> > >>>>>> all
> > >>>>>>>>> operators
> > >>>>>>>>> in
> > >>>>>>>>>
> > >>>>>>>>> Streams. But I think there are some unresolved questions about
> > >>>>>>>> forwarding,
> > >>>>>>>>> like "what happens to the headers in a join?"
> > >>>>>>>>> Probably this would be solve once KIP-159 is implemented and
> > >>>> supporting
> > >>>>>>>>> Headers.
> > >>>>>>>>>
> > >>>>>>>>>> There's of course some middle ground, but instinctively, I
> think
> > >> I'd
> > >>>>>>>>> prefer to have a clear definition that headers are currently
> > *not*
> > >>>>>>>>> forwarded, rather than having a complex list of operators that
> do
> > >> or
> > >>>>>>>> don't
> > >>>>>>>>> forward them. Plus, I think it might be tricky to define this
> > >>>> behavior
> > >>>>>>>>> while not allowing the scope to return to that of your original
> > >>>>>> proposal!
> > >>>>>>>>>
> > >>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the
> original
> > >>>>>>>> proposal.
> > >>>>>>>>> The current one pass it as part of `RecordContext`, so if it's
> > >>>> forward
> > >>>>>> it
> > >>>>>>>>> or not is as the same as `RecordContext`.
> > >>>>>>>>> On top of this implementation, we can design how
> filter/map/join
> > >> will
> > >>>>>> be
> > >>>>>>>>> handled. Probably following KIP-159 approach.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Jorge.
> > >>>>>>>>>
> > >>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<
> > >> wangg...@gmail.com
> > >>>>> )
> > >>>>>>>>> escribió:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Jorge,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for the written KIP! Made a pass over it and left some
> > >>>> comments
> > >>>>>>>>>> (some of them overlapped with John's):
> > >>>>>>>>>>
> > >>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
> > our
> > >>>>>>>>> topology:
> > >>>>>>>>>>
> > >>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > >>>> straight-forward.
> > >>>>>>>>>> 1.b. In DSL stateless operators, it should be
> straight-forward.
> > >>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> joins?
> > >>>>>>>>>>
> > >>>>>>>>>> 2. In terms of internal implementations, should the state
> store
> > >>>> cache
> > >>>>>>>>>> include the headers then in order to be sent downstreams?
> > >>>>>>>>>>
> > >>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
> > >>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth
> > >> taking
> > >>>> a
> > >>>>>>>>> look
> > >>>>>>>>>> at that KIP.
> > >>>>>>>>>>
> > >>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
> this
> > >>>> should
> > >>>>>>>> be
> > >>>>>>>>>> removed?
> > >>>>>>>>>>
> > >>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
> is
> > >>>> only
> > >>>>>>>> for
> > >>>>>>>>>> exposing the headers for reading, and not allowing users to
> add
> > /
> > >>>>>>>> modify
> > >>>>>>>>>> headers, right? If yes, we'd better state it clearly at the
> > >>>> "Proposed
> > >>>>>>>>>> Changes" section.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Guozhang
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <
> j...@confluent.io
> > >
> > >>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Jorge,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the design work.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I agree that de-scoping the work to just the Processor API
> will
> > >>>> help
> > >>>>>>>>>>> contain the design and implementation complexity.
> > >>>>>>>>>>>
> > >>>>>>>>>>> In the KIP, it mentions that the headers would be available
> in
> > >> the
> > >>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says
> that
> > >>>>>>>>>>> implementers would need to implement the method "void
> process(K
> > >>>> key,
> > >>>>>>>> V
> > >>>>>>>>>>> value, Headers headers);". I think maybe you meant to remove
> > the
> > >>>>>>>>> proposal
> > >>>>>>>>>>> to modify "process", since it wouldn't be necessary in
> > >> conjunction
> > >>>>>>>> with
> > >>>>>>>>>> the
> > >>>>>>>>>>> ProcessorContext change, and it's not represented in your PR.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it
> might
> > >> be
> > >>>>>>>>>> valuable
> > >>>>>>>>>>> to define what will happen to headers once this change is
> > >>>>>>>> implemented.
> > >>>>>>>>>> For
> > >>>>>>>>>>> example, I think a minimal groundwork-level change might be
> to
> > >> make
> > >>>>>>>> the
> > >>>>>>>>>> API
> > >>>>>>>>>>> changes, while promising to drop all headers from input
> > records.
> > >>>>>>>>>>>
> > >>>>>>>>>>> A maximal groundwork change would be to forward the headers
> > >> through
> > >>>>>>>> all
> > >>>>>>>>>>> operators in Streams. But I think there are some unresolved
> > >>>> questions
> > >>>>>>>>>> about
> > >>>>>>>>>>> forwarding, like "what happens to the headers in a join?"
> > >>>>>>>>>>>
> > >>>>>>>>>>> There's of course some middle ground, but instinctively, I
> > think
> > >>>> I'd
> > >>>>>>>>>> prefer
> > >>>>>>>>>>> to have a clear definition that headers are currently *not*
> > >>>>>>>> forwarded,
> > >>>>>>>>>>> rather than having a complex list of operators that do or
> don't
> > >>>>>>>> forward
> > >>>>>>>>>>> them. Plus, I think it might be tricky to define this
> behavior
> > >>>> while
> > >>>>>>>>> not
> > >>>>>>>>>>> allowing the scope to return to that of your original
> proposal!
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks again for the KIP,
> > >>>>>>>>>>> -John
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya
> <
> > >>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP and
> > >> create
> > >>>> a
> > >>>>>>>>> PR.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Looking forward to your feedback,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Jorge.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > >>>>>>>>>>> matth...@confluent.io
> > >>>>>>>>>>>>> )
> > >>>>>>>>>>>> escribió:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Jorge,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I would like to unblock this KIP to make some progress. The
> > >>>>>>>> tricky
> > >>>>>>>>>>>>> question of this work, seems to be how to expose headers at
> > DSL
> > >>>>>>>>>> level.
> > >>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for Processor
> > >> API,
> > >>>>>>>> it
> > >>>>>>>>>>>>> seems to be rather straight forward to add headers to the
> > API.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header
> > >> support
> > >>>>>>>>> for
> > >>>>>>>>>>>>> Processor API only as a first step. If this is done, we can
> > see
> > >>>>>>>> in
> > >>>>>>>>> a
> > >>>>>>>>>>>>> second step, how to add headers at DSL level.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> WDYT about this proposal?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly.
> > Note,
> > >>>>>>>>> that
> > >>>>>>>>>> we
> > >>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them
> > >>>>>>>>> accordingly:
> > >>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> > >>>>>>>>>>>>>> Thanks for your feedback!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy,
> but
> > I
> > >>>>>>>>> think
> > >>>>>>>>>>> it
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> not necessary. It should be enough with mapping headers to
> > >>>>>>>>>> key/value
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> then group using current KeyValue structure.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
> > >>>>>>>>>> structure,
> > >>>>>>>>>>>>> hence
> > >>>>>>>>>>>>>> considering headers as part of stateful operations will
> not
> > >> fit
> > >>>>>>>>> in
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>>> approach and increase complexity (I cannot think in a
> > use-case
> > >>>>>>>>> that
> > >>>>>>>>>>>> need
> > >>>>>>>>>>>>>> this).
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not
> aware
> > >> of
> > >>>>>>>>>>> KIP-159
> > >>>>>>>>>>>>> or
> > >>>>>>>>>>>>>> KAFKA-5632.
> > >>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
> Headers
> > to
> > >>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the
> > scope
> > >>>>>>>> of
> > >>>>>>>>>>>>> KIP-159.
> > >>>>>>>>>>>>>> If it includes stateful operations will be difficult to
> > >>>>>>>>> implemented
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>> stated in 2.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>> Jorge.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > >>>>>>>>>>>>> matth...@confluent.io>)
> > >>>>>>>>>>>>>> escribió:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the KIP Jorge,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with
> > adding
> > >>>>>>>>> new
> > >>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159.
> > Are
> > >>>>>>>>> you
> > >>>>>>>>>>>> aware
> > >>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it
> might
> > >> be
> > >>>>>>>>>> worth
> > >>>>>>>>>>>>>>> browsing through them.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> A few further questions:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>  - why do you want to add the headers to `KeyValue`? I am
> > not
> > >>>>>>>>> sure
> > >>>>>>>>>>> if
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> should consider headers as optional metadata and add it
> to
> > >>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>  - You only include stateless single-record
> transformations
> > >> at
> > >>>>>>>>> the
> > >>>>>>>>>>> DSL
> > >>>>>>>>>>>>>>> level. Do you suggest that all other operator just drop
> > >>>>>>>> headers
> > >>>>>>>>> on
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>> floor?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>  - Why do you only want to put headers into in-memory and
> > >>>>>>>> cache
> > >>>>>>>>>> but
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO,
> > all
> > >>>>>>>>>> stores
> > >>>>>>>>>>>>>>> should behave the same at DSL level.
> > >>>>>>>>>>>>>>>    -> if we store the headers in the state stores, what
> is
> > >> the
> > >>>>>>>>>>> upgrade
> > >>>>>>>>>>>>>>> path?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>  - Why do we need to store record header in state in the
> > >> first
> > >>>>>>>>>>> place,
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>>> we exclude stateful operator at DSL level?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> What is the motivation for the "border lines" you choose?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > >>>>>>>>>>>>>>>> Jorge,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in
> the
> > >>>>>>>>>>> community
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> been interested in getting into Kafka Streams.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial
> > >> question.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this
> > KIP
> > >>>>>>>> we
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>>>> increasing them again.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm
> > >>>>>>>>> wondering
> > >>>>>>>>>> if
> > >>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>> is something else we can do to cut down on the overloads
> > >>>>>>>>>>>> introduced.  I
> > >>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to
> > think
> > >>>>>>>>> about
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>> more, but I wanted to put the thought out there.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>> Bill
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate
> > >>>>>>>> Otoya <
> > >>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to
> > Kafka
> > >>>>>>>>>>> Streams
> > >>>>>>>>>>>>> API:
> > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter,
> map
> > >>>>>>>> and
> > >>>>>>>>>>>> process
> > >>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins,
> windows)
> > >> are
> > >>>>>>>>> not
> > >>>>>>>>>>>>>>>>> considered.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Proposed changes/Draft:
> > >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> > >>>>>>>>>>>> streams-headers
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Jorge.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> --
> > >>>>>>>>>> -- Guozhang
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >>
> > >
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to