I am actually not sure about this. Because it's about the semantics at
PAPI level, but KIP-159 targets the DSL, it might actually be better to
have a separate KIP?

-Matthias

On 5/11/18 9:26 AM, Guozhang Wang wrote:
> That's a good question. I think we can manage this in KIP-159. I will go
> ahead and try to augment that KIP together with the original author Jeyhun.
> 
> 
> Guozhang
> 
> On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
> 
>> Thanks Guozhang and Matthias! I do also agree with this way of handling
>> headers inheritance. I will add them to the KIP doc.
>>
>>> We can discuss about extending the current protocol and how to enable
>> users
>>> override those rule, and how to expose them in the DSL layer in a future
>>> KIP.
>>
>> About this, should this be managed on KIP-159 or a new one?
>>
>> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<matth...@confluent.io
>>> )
>> escribió:
>>
>>> Thanks Guozhang! Sounds good to me!
>>>
>>> -Matthias
>>>
>>> On 5/10/18 7:55 AM, Guozhang Wang wrote:
>>>> Thanks for your thoughts Matthias. I think if we do want to bring
>> KIP-244
>>>> into 2.0 then we need to keep its scope small and well defined. For
>> that
>>>> I'm proposing:
>>>>
>>>> 1. Make the inheritance implementation of headers consistent with what
>> we
>>>> had with other record context fields. I.e. pass through the record
>>> context
>>>> in `context.forward()`. Note that within a processor node, users can
>>>> already manipulate the Headers with the given APIs, so at the time of
>>>> forwarding, the library can just copy what-ever is left / updated to
>> the
>>>> next processor node.
>>>>
>>>> 2. In the sink node, where a record is being sent to the Kafka topic,
>> we
>>>> should consider the following:
>>>>
>>>> a. For sink topics, we will set the headers into the producer record.
>>>> b. For repartition topics, we will the headers into the producer
>> record.
>>>> c. For changelog topics, we will drop the headers in the produce record
>>>> since they will not be used in restoration and not stored in the state
>>>> store either.
>>>>
>>>>
>>>> We can discuss about extending the current protocol and how to enable
>>> users
>>>> override those rule, and how to expose them in the DSL layer in a
>> future
>>>> KIP.
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <matth...@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Guozhang,
>>>>>
>>>>> if you advocate to forward headers by default, it might be a better
>>>>> default strategy do forward the headers for all operators (similar to
>>>>> topic/partition/offset metadata). It's usually harder for users to
>>>>> reason about different cases and thus I would prefer to have
>> consistent
>>>>> behavior, ie, only one default strategy instead of introducing
>> different
>>>>> cases.
>>>>>
>>>>> Btw: My argument about dropping headers by default only implies, that
>>>>> users need to copy the headers explicitly to the output records in
>> there
>>>>> code of they want to inspect them later -- it does not imply that
>>>>> headers cannot be forwarded downstream. (Not sure if this was clear).
>>>>>
>>>>> I am also ok with copying be default thought (for me, it's a 51/49
>>>>> preference for dropping by default only).
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote:
>>>>>> Hi Matthias,
>>>>>>
>>>>>> My concern of setting `null` in all cases is that it would make
>> headers
>>>>> not
>>>>>> very useful in KIP-244 then, because headers will only be available
>> at
>>>>> the
>>>>>> source stream / table, but not in any of the following instances. In
>>>>>> practice users may be more likely to look into the headers later in
>> the
>>>>>> pipeline. Personally I'd suggest we pass the headers for all
>> stateless
>>>>>> operators in DSL and everywhere in PAPI's context.forward(). For
>>>>>> repartition topics and sink topics, we also set them in the produced
>>>>>> records accordingly; for changelog topics, we do not set them since
>>> they
>>>>>> are not going to be used anywhere in the store.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <
>> matth...@confluent.io
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> I agree, that we should not block this KIP if possible.
>> Nevertheless,
>>> we
>>>>>>> should try to get a reasonable default strategy for inheriting the
>>>>>>> headers so we don't need to change it later on.
>>>>>>>
>>>>>>> Let's see what other think. I still tend slightly to set to `null`
>> by
>>>>>>> default for all cases. If the default strategy is different for
>>>>>>> different operators as you suggest, it might be confusion to users.
>>>>>>> IMHO, the default behavior should be as simple as possible.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote:
>>>>>>>> Matthias, thanks for sharing your opinions in the inheritance
>>> protocol
>>>>> of
>>>>>>>> the record context. I'm thinking maybe we should make this
>> discussion
>>>>> as
>>>>>>> a
>>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would be
>>> smaller,
>>>>>>> and
>>>>>>>> within KIP-244 we can have a simple inheritance rule that setting
>> it
>>> to
>>>>>>>> null when 1) going through stateful operators and 2) sending to any
>>>>>>> topics.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Making the inheritance protocol a public contract seems reasonable
>>> to
>>>>>>> me.
>>>>>>>>>
>>>>>>>>> In the current implementation, all output records inherits the
>>> offset,
>>>>>>>>> timestamp, topic, and partition metadata from the input record. We
>>>>>>>>> already added an API to change the timestamp explicitly for the
>>> output
>>>>>>>>> record thought.
>>>>>>>>>
>>>>>>>>> I think it make sense to keep the inheritance of offset, topic,
>> and
>>>>>>>>> partition. For headers, it's worth to discuss. I see arguments for
>>> two
>>>>>>>>> strategies: (1) inherit by default, (2) set `null` by default.
>>>>>>>>> Independent of the default behavior, we should add an API to set
>>>>> headers
>>>>>>>>> for output records explicitly though (similar to the "set
>> timestamp
>>>>>>> API").
>>>>>>>>>
>>>>>>>>> From my point of view, timestamp/headers are a different
>>>>>>>>> "class/category" of data/metadata than topic/partition/offset. For
>>> the
>>>>>>>>> first category, it makes sense to manipulate them and it's more
>> than
>>>>>>>>> "plain metadata"; especially the timestamp. For the second
>> category
>>> it
>>>>>>>>> does not make sense to manipulate it, and to me
>>> topic/partition/offset
>>>>>>>>> is pure metadata only---strictly speaking, it's even questionable
>> if
>>>>>>>>> output records should have any value for topic/partition/offset in
>>> the
>>>>>>>>> first place, or if they should be `null`, because those attributes
>>> do
>>>>>>>>> only make sense for source records that are consumed from a topic
>>>>>>>>> directly only. On the other hand, if we make this difference
>>> explicit,
>>>>>>>>> it might be useful information for the use to track the current
>>>>>>>>> topic/partition/offset of the original source record.
>>>>>>>>>
>>>>>>>>> Furthermore, to me, timestamps and headers are somewhat different,
>>>>> too.
>>>>>>>>> For stream processing it's required that every record has a
>>> timestamp;
>>>>>>>>> thus, it make sense to inherit the input record timestamp by
>> default
>>>>> (a
>>>>>>>>> timestamp is not really metadata but actually equally important to
>>> key
>>>>>>>>> and value from my point of view). Header however are optional, and
>>>>> thus
>>>>>>>>> inheriting them is not really required. It might be convenient
>>> though:
>>>>>>>>> for example, imagine a simple "filter-only" application -- it
>> would
>>> be
>>>>>>>>> cumbersome for users to explicitly copy the headers from the input
>>>>>>>>> records to the output records -- it seems to be unnecessary
>>>>> boilerplate
>>>>>>>>> code. On the other hand, for any other more complex use case, it's
>>>>>>>>> questionable to inherit headers---note, that headers would be
>>> written
>>>>> to
>>>>>>>>> the output topics increasing the size of the messages. Overall, I
>> am
>>>>> not
>>>>>>>>> sure which default strategy might be the better one for headers.
>> Is
>>>>>>>>> there a convincing argument for either one of them? I slightly
>> tend
>>> to
>>>>>>>>> think that using `null` as default might be better.
>>>>>>>>>
>>>>>>>>> Last, we could also make the default behavior configurable.
>>> Something
>>>>>>>>> like `inherit.record.headers=true/false` with default value
>> "false".
>>>>>>>>> This would allow people to opt-in for auto-header-inheritance.
>> Just
>>> an
>>>>>>>>> idea I wanted to add to the discussion---not sure if it's a good
>>> one.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
>>>>>>>>>> Hello Jorge,
>>>>>>>>>>
>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
>>>>>>>>> `RecordContext`
>>>>>>>>>> would be handled the same way as other attributes.
>>>>>>>>>>
>>>>>>>>>> Today we do not have a clear inheritance protocol for other
>> fields
>>> of
>>>>>>>>>> RecordContext yet: although internally we do have some criterion
>> on
>>>>>>>>>> topic/partition/offset and timestamp, they are not explicitly
>>> exposed
>>>>>>> to
>>>>>>>>>> users.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think we still need to have a defined protocol for headers
>>> itself,
>>>>>>> but
>>>>>>>>> I
>>>>>>>>>> agree that it better to be scoped out side of this KIP, since
>> this
>>>>>>>>>> inheritance protocol itself for all the fields of RecordContext
>>> would
>>>>>>>>>> better be a separate KIP. We can document this clearly in the
>> wiki
>>>>>>> page.
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
>>>>>>>>>> garcia.florian.pe...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> For me this is a great first step to have Headers in streaming.
>>>>>>>>>>> My current use case is about distributed tracing (Zipkin) and
>> with
>>>>> the
>>>>>>>>>>> headers in the processorContext() I'll be able to manage that
>> for
>>>>> the
>>>>>>>>> most
>>>>>>>>>>> cases.
>>>>>>>>>>> The KIP-159 should follow after this but this is where all the
>>> major
>>>>>>>>>>> questions will arise for stateful operations (as Guozhang said).
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the work on this Jorge.
>>>>>>>>>>>
>>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
>>>>>>>>>>> quilcate.jo...@gmail.com> a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Guozhang and John for your feedback.
>>>>>>>>>>>>
>>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
>>> our
>>>>>>>>>>>> topology:
>>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
>>>>>>> straight-forward.
>>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
>> straight-forward.
>>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
>> joins?
>>>>>>>>>>>>
>>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
>>>>>>>>>>>> `RecordContext` would be handled the same way as other
>>> attributes.
>>>>>>>>>>>>
>>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
>>>>>>>>>>>> filter/map/branch",
>>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look at that
>>> KIP.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I will point to it.
>>>>>>>>>>>>
>>>>>>>>>>>>> 2. In terms of internal implementations, should the state
>> store
>>>>>>>>>>>> cache include the headers then in order to be sent downstreams?
>>>>>>>>>>>>
>>>>>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I
>>> thinks
>>>>>>>>> this
>>>>>>>>>>> is
>>>>>>>>>>>> already supported. I will detail this on the KIP.
>>>>>>>>>>>>
>>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
>> this
>>>>>>> should
>>>>>>>>>>> be
>>>>>>>>>>>> removed?
>>>>>>>>>>>>
>>>>>>>>>>>> Fixed, thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
>> is
>>>>>>> only
>>>>>>>>>>>> for exposing
>>>>>>>>>>>> the headers for reading, and not allowing users to add / modify
>>>>>>>>> headers,
>>>>>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed
>>>>> Changes"
>>>>>>>>>>>> section.
>>>>>>>>>>>>
>>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and headers will
>>> be
>>>>>>> send
>>>>>>>>>>>> downstream, it can be mutated (add/remove headers).
>>>>>>>>>>>>
>>>>>>>>>>>>  > Also, despite the decreased scope in this KIP, I think it
>>> might
>>>>> be
>>>>>>>>>>>> valuable to define what will happen to headers once this change
>>> is
>>>>>>>>>>>> implemented. For example, I think a minimal groundwork-level
>>> change
>>>>>>>>> might
>>>>>>>>>>>> be to make the API changes, while promising to drop all headers
>>>>> from
>>>>>>>>>>> input
>>>>>>>>>>>> records.
>>>>>>>>>>>>
>>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and don't
>>> drop
>>>>>>>>> yhrm.
>>>>>>>>>>>> Clients will have to drop `Headers` if they have used them.
>>>>>>>>>>>> Or it could be something like a boolean config property that
>>> manage
>>>>>>>>> this.
>>>>>>>>>>>> I would like to hear feedback here.
>>>>>>>>>>>>
>>>>>>>>>>>>> A maximal groundwork change would be to forward the headers
>>>>> through
>>>>>>>>> all
>>>>>>>>>>>> operators
>>>>>>>>>>>> in
>>>>>>>>>>>>
>>>>>>>>>>>> Streams. But I think there are some unresolved questions about
>>>>>>>>>>> forwarding,
>>>>>>>>>>>> like "what happens to the headers in a join?"
>>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented and
>>>>>>> supporting
>>>>>>>>>>>> Headers.
>>>>>>>>>>>>
>>>>>>>>>>>>> There's of course some middle ground, but instinctively, I
>> think
>>>>> I'd
>>>>>>>>>>>> prefer to have a clear definition that headers are currently
>>> *not*
>>>>>>>>>>>> forwarded, rather than having a complex list of operators that
>> do
>>>>> or
>>>>>>>>>>> don't
>>>>>>>>>>>> forward them. Plus, I think it might be tricky to define this
>>>>>>> behavior
>>>>>>>>>>>> while not allowing the scope to return to that of your original
>>>>>>>>> proposal!
>>>>>>>>>>>>
>>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the
>> original
>>>>>>>>>>> proposal.
>>>>>>>>>>>> The current one pass it as part of `RecordContext`, so if it's
>>>>>>> forward
>>>>>>>>> it
>>>>>>>>>>>> or not is as the same as `RecordContext`.
>>>>>>>>>>>> On top of this implementation, we can design how
>> filter/map/join
>>>>> will
>>>>>>>>> be
>>>>>>>>>>>> handled. Probably following KIP-159 approach.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>
>>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<
>>>>> wangg...@gmail.com
>>>>>>>> )
>>>>>>>>>>>> escribió:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jorge,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and left some
>>>>>>> comments
>>>>>>>>>>>>> (some of them overlapped with John's):
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
>>> our
>>>>>>>>>>>> topology:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
>>>>>>> straight-forward.
>>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
>> straight-forward.
>>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
>> joins?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. In terms of internal implementations, should the state
>> store
>>>>>>> cache
>>>>>>>>>>>>> include the headers then in order to be sent downstreams?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
>>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth
>>>>> taking
>>>>>>> a
>>>>>>>>>>>> look
>>>>>>>>>>>>> at that KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
>> this
>>>>>>> should
>>>>>>>>>>> be
>>>>>>>>>>>>> removed?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
>> is
>>>>>>> only
>>>>>>>>>>> for
>>>>>>>>>>>>> exposing the headers for reading, and not allowing users to
>> add
>>> /
>>>>>>>>>>> modify
>>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at the
>>>>>>> "Proposed
>>>>>>>>>>>>> Changes" section.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <
>> j...@confluent.io
>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jorge,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the design work.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor API
>> will
>>>>>>> help
>>>>>>>>>>>>>> contain the design and implementation complexity.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In the KIP, it mentions that the headers would be available
>> in
>>>>> the
>>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says
>> that
>>>>>>>>>>>>>> implementers would need to implement the method "void
>> process(K
>>>>>>> key,
>>>>>>>>>>> V
>>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to remove
>>> the
>>>>>>>>>>>> proposal
>>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in
>>>>> conjunction
>>>>>>>>>>> with
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> ProcessorContext change, and it's not represented in your PR.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it
>> might
>>>>> be
>>>>>>>>>>>>> valuable
>>>>>>>>>>>>>> to define what will happen to headers once this change is
>>>>>>>>>>> implemented.
>>>>>>>>>>>>> For
>>>>>>>>>>>>>> example, I think a minimal groundwork-level change might be
>> to
>>>>> make
>>>>>>>>>>> the
>>>>>>>>>>>>> API
>>>>>>>>>>>>>> changes, while promising to drop all headers from input
>>> records.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A maximal groundwork change would be to forward the headers
>>>>> through
>>>>>>>>>>> all
>>>>>>>>>>>>>> operators in Streams. But I think there are some unresolved
>>>>>>> questions
>>>>>>>>>>>>> about
>>>>>>>>>>>>>> forwarding, like "what happens to the headers in a join?"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There's of course some middle ground, but instinctively, I
>>> think
>>>>>>> I'd
>>>>>>>>>>>>> prefer
>>>>>>>>>>>>>> to have a clear definition that headers are currently *not*
>>>>>>>>>>> forwarded,
>>>>>>>>>>>>>> rather than having a complex list of operators that do or
>> don't
>>>>>>>>>>> forward
>>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this
>> behavior
>>>>>>> while
>>>>>>>>>>>> not
>>>>>>>>>>>>>> allowing the scope to return to that of your original
>> proposal!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again for the KIP,
>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya
>> <
>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP and
>>>>> create
>>>>>>> a
>>>>>>>>>>>> PR.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looking forward to your feedback,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>> escribió:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jorge,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to unblock this KIP to make some progress. The
>>>>>>>>>>> tricky
>>>>>>>>>>>>>>>> question of this work, seems to be how to expose headers at
>>> DSL
>>>>>>>>>>>>> level.
>>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for Processor
>>>>> API,
>>>>>>>>>>> it
>>>>>>>>>>>>>>>> seems to be rather straight forward to add headers to the
>>> API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header
>>>>> support
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> Processor API only as a first step. If this is done, we can
>>> see
>>>>>>>>>>> in
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> second step, how to add headers at DSL level.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> WDYT about this proposal?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly.
>>> Note,
>>>>>>>>>>>> that
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them
>>>>>>>>>>>> accordingly:
>>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>>>>>>>>>> Thanks for your feedback!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy,
>> but
>>> I
>>>>>>>>>>>> think
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping headers to
>>>>>>>>>>>>> key/value
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> then group using current KeyValue structure.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
>>>>>>>>>>>>> structure,
>>>>>>>>>>>>>>>> hence
>>>>>>>>>>>>>>>>> considering headers as part of stateful operations will
>> not
>>>>> fit
>>>>>>>>>>>> in
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think in a
>>> use-case
>>>>>>>>>>>> that
>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>> this).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not
>> aware
>>>>> of
>>>>>>>>>>>>>> KIP-159
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> KAFKA-5632.
>>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
>> Headers
>>> to
>>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the
>>> scope
>>>>>>>>>>> of
>>>>>>>>>>>>>>>> KIP-159.
>>>>>>>>>>>>>>>>> If it includes stateful operations will be difficult to
>>>>>>>>>>>> implemented
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> stated in 2.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
>>>>>>>>>>>>>>>> matth...@confluent.io>)
>>>>>>>>>>>>>>>>> escribió:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with
>>> adding
>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159.
>>> Are
>>>>>>>>>>>> you
>>>>>>>>>>>>>>> aware
>>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it
>> might
>>>>> be
>>>>>>>>>>>>> worth
>>>>>>>>>>>>>>>>>> browsing through them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> A few further questions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  - why do you want to add the headers to `KeyValue`? I am
>>> not
>>>>>>>>>>>> sure
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> should consider headers as optional metadata and add it
>> to
>>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  - You only include stateless single-record
>> transformations
>>>>> at
>>>>>>>>>>>> the
>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just drop
>>>>>>>>>>> headers
>>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> floor?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  - Why do you only want to put headers into in-memory and
>>>>>>>>>>> cache
>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO,
>>> all
>>>>>>>>>>>>> stores
>>>>>>>>>>>>>>>>>> should behave the same at DSL level.
>>>>>>>>>>>>>>>>>>    -> if we store the headers in the state stores, what
>> is
>>>>> the
>>>>>>>>>>>>>> upgrade
>>>>>>>>>>>>>>>>>> path?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  - Why do we need to store record header in state in the
>>>>> first
>>>>>>>>>>>>>> place,
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you choose?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
>>>>>>>>>>>>>>>>>>> Jorge,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in
>> the
>>>>>>>>>>>>>> community
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial
>>>>> question.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this
>>> KIP
>>>>>>>>>>> we
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> increasing them again.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm
>>>>>>>>>>>> wondering
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the overloads
>>>>>>>>>>>>>>> introduced.  I
>>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to
>>> think
>>>>>>>>>>>> about
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate
>>>>>>>>>>> Otoya <
>>>>>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to
>>> Kafka
>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>> API:
>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter,
>> map
>>>>>>>>>>> and
>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins,
>> windows)
>>>>> are
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> considered.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Proposed changes/Draft:
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
>>>>>>>>>>>>>>> streams-headers
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to