Making the inheritance protocol a public contract seems reasonable to me.

In the current implementation, all output records inherits the offset,
timestamp, topic, and partition metadata from the input record. We
already added an API to change the timestamp explicitly for the output
record thought.

I think it make sense to keep the inheritance of offset, topic, and
partition. For headers, it's worth to discuss. I see arguments for two
strategies: (1) inherit by default, (2) set `null` by default.
Independent of the default behavior, we should add an API to set headers
for output records explicitly though (similar to the "set timestamp API").

From my point of view, timestamp/headers are a different
"class/category" of data/metadata than topic/partition/offset. For the
first category, it makes sense to manipulate them and it's more than
"plain metadata"; especially the timestamp. For the second category it
does not make sense to manipulate it, and to me topic/partition/offset
is pure metadata only---strictly speaking, it's even questionable if
output records should have any value for topic/partition/offset in the
first place, or if they should be `null`, because those attributes do
only make sense for source records that are consumed from a topic
directly only. On the other hand, if we make this difference explicit,
it might be useful information for the use to track the current
topic/partition/offset of the original source record.

Furthermore, to me, timestamps and headers are somewhat different, too.
For stream processing it's required that every record has a timestamp;
thus, it make sense to inherit the input record timestamp by default (a
timestamp is not really metadata but actually equally important to key
and value from my point of view). Header however are optional, and thus
inheriting them is not really required. It might be convenient though:
for example, imagine a simple "filter-only" application -- it would be
cumbersome for users to explicitly copy the headers from the input
records to the output records -- it seems to be unnecessary boilerplate
code. On the other hand, for any other more complex use case, it's
questionable to inherit headers---note, that headers would be written to
the output topics increasing the size of the messages. Overall, I am not
sure which default strategy might be the better one for headers. Is
there a convincing argument for either one of them? I slightly tend to
think that using `null` as default might be better.

Last, we could also make the default behavior configurable. Something
like `inherit.record.headers=true/false` with default value "false".
This would allow people to opt-in for auto-header-inheritance. Just an
idea I wanted to add to the discussion---not sure if it's a good one.


-Matthias

On 5/4/18 3:13 PM, Guozhang Wang wrote:
> Hello Jorge,
> 
>> Agree. Probably point 3 handles this. `Headers` been part of `RecordContext`
> would be handled the same way as other attributes.
> 
> Today we do not have a clear inheritance protocol for other fields of
> RecordContext yet: although internally we do have some criterion on
> topic/partition/offset and timestamp, they are not explicitly exposed to
> users.
> 
> 
> I think we still need to have a defined protocol for headers itself, but I
> agree that it better to be scoped out side of this KIP, since this
> inheritance protocol itself for all the fields of RecordContext would
> better be a separate KIP. We can document this clearly in the wiki page.
> 
> Guozhang
> 
> 
> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> garcia.florian.pe...@gmail.com> wrote:
> 
>> Hi,
>>
>> For me this is a great first step to have Headers in streaming.
>> My current use case is about distributed tracing (Zipkin) and with the
>> headers in the processorContext() I'll be able to manage that for the most
>> cases.
>> The KIP-159 should follow after this but this is where all the major
>> questions will arise for stateful operations (as Guozhang said).
>>
>> Thanks for the work on this Jorge.
>>
>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
>> quilcate.jo...@gmail.com> a écrit :
>>
>>> Thanks Guozhang and John for your feedback.
>>>
>>>> 1. We need to have a clear inheritance protocol of headers in our
>>> topology:
>>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
>>>> 1.b. In DSL stateless operators, it should be straight-forward.
>>>> 1.c. What about in stateful operators like aggregates and joins?
>>>
>>> Agree. Probably point 3 handles this. `Headers` been part of
>>> `RecordContext` would be handled the same way as other attributes.
>>>
>>>> 3. In future work "Adding DSL Processors to use Headers to
>>> filter/map/branch",
>>> it may well be covered in KIP-159; worth taking a look at that KIP.
>>>
>>> Yes, I will point to it.
>>>
>>>> 2. In terms of internal implementations, should the state store
>>> cache include the headers then in order to be sent downstreams?
>>>
>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this
>> is
>>> already supported. I will detail this on the KIP.
>>>
>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
>> be
>>> removed?
>>>
>>> Fixed, thanks.
>>>
>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only
>>> for exposing
>>> the headers for reading, and not allowing users to add / modify headers,
>>> right? If yes, we'd better state it clearly at the "Proposed Changes"
>>> section.
>>>
>>> As headers is exposed in the `ProcessContext`, and headers will be send
>>> downstream, it can be mutated (add/remove headers).
>>>
>>>  > Also, despite the decreased scope in this KIP, I think it might be
>>> valuable to define what will happen to headers once this change is
>>> implemented. For example, I think a minimal groundwork-level change might
>>> be to make the API changes, while promising to drop all headers from
>> input
>>> records.
>>>
>>> I will suggest to pass headers to downstream nodes, and don't drop yhrm.
>>> Clients will have to drop `Headers` if they have used them.
>>> Or it could be something like a boolean config property that manage this.
>>> I would like to hear feedback here.
>>>
>>>> A maximal groundwork change would be to forward the headers through all
>>> operators
>>> in
>>>
>>> Streams. But I think there are some unresolved questions about
>> forwarding,
>>> like "what happens to the headers in a join?"
>>> Probably this would be solve once KIP-159 is implemented and supporting
>>> Headers.
>>>
>>>> There's of course some middle ground, but instinctively, I think I'd
>>> prefer to have a clear definition that headers are currently *not*
>>> forwarded, rather than having a complex list of operators that do or
>> don't
>>> forward them. Plus, I think it might be tricky to define this behavior
>>> while not allowing the scope to return to that of your original proposal!
>>>
>>> Agree. But `Headers` were forwarded *explicitly* in the original
>> proposal.
>>> The current one pass it as part of `RecordContext`, so if it's forward it
>>> or not is as the same as `RecordContext`.
>>> On top of this implementation, we can design how filter/map/join will be
>>> handled. Probably following KIP-159 approach.
>>>
>>> Cheers,
>>> Jorge.
>>>
>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>)
>>> escribió:
>>>
>>>> Hi Jorge,
>>>>
>>>> Thanks for the written KIP! Made a pass over it and left some comments
>>>> (some of them overlapped with John's):
>>>>
>>>> 1. We need to have a clear inheritance protocol of headers in our
>>> topology:
>>>>
>>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
>>>> 1.b. In DSL stateless operators, it should be straight-forward.
>>>> 1.c. What about in stateful operators like aggregates and joins?
>>>>
>>>> 2. In terms of internal implementations, should the state store cache
>>>> include the headers then in order to be sent downstreams?
>>>>
>>>> 3. In future work "Adding DSL Processors to use Headers to
>>>> filter/map/branch", it may well be covered in KIP-159; worth taking a
>>> look
>>>> at that KIP.
>>>>
>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
>> be
>>>> removed?
>>>>
>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only
>> for
>>>> exposing the headers for reading, and not allowing users to add /
>> modify
>>>> headers, right? If yes, we'd better state it clearly at the "Proposed
>>>> Changes" section.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io>
>> wrote:
>>>>
>>>>> Hi Jorge,
>>>>>
>>>>> Thanks for the design work.
>>>>>
>>>>> I agree that de-scoping the work to just the Processor API will help
>>>>> contain the design and implementation complexity.
>>>>>
>>>>> In the KIP, it mentions that the headers would be available in the
>>>>> ProcessorContext, (like "context.headers()"). It also says that
>>>>> implementers would need to implement the method "void process(K key,
>> V
>>>>> value, Headers headers);". I think maybe you meant to remove the
>>> proposal
>>>>> to modify "process", since it wouldn't be necessary in conjunction
>> with
>>>> the
>>>>> ProcessorContext change, and it's not represented in your PR.
>>>>>
>>>>> Also, despite the decreased scope in this KIP, I think it might be
>>>> valuable
>>>>> to define what will happen to headers once this change is
>> implemented.
>>>> For
>>>>> example, I think a minimal groundwork-level change might be to make
>> the
>>>> API
>>>>> changes, while promising to drop all headers from input records.
>>>>>
>>>>> A maximal groundwork change would be to forward the headers through
>> all
>>>>> operators in Streams. But I think there are some unresolved questions
>>>> about
>>>>> forwarding, like "what happens to the headers in a join?"
>>>>>
>>>>> There's of course some middle ground, but instinctively, I think I'd
>>>> prefer
>>>>> to have a clear definition that headers are currently *not*
>> forwarded,
>>>>> rather than having a complex list of operators that do or don't
>> forward
>>>>> them. Plus, I think it might be tricky to define this behavior while
>>> not
>>>>> allowing the scope to return to that of your original proposal!
>>>>>
>>>>> Thanks again for the KIP,
>>>>> -John
>>>>>
>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>> I've created a new JIRA to track this, updated the KIP and create a
>>> PR.
>>>>>>
>>>>>> Looking forward to your feedback,
>>>>>>
>>>>>> Jorge.
>>>>>>
>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
>>>>> matth...@confluent.io
>>>>>>> )
>>>>>> escribió:
>>>>>>
>>>>>>> Hi Jorge,
>>>>>>>
>>>>>>> I would like to unblock this KIP to make some progress. The
>> tricky
>>>>>>> question of this work, seems to be how to expose headers at DSL
>>>> level.
>>>>>>> This related to KIP-149 and KIP-159. However, for Processor API,
>> it
>>>>>>> seems to be rather straight forward to add headers to the API.
>>>>>>>
>>>>>>> Thus, I would suggest to de-scope this KIP and add header support
>>> for
>>>>>>> Processor API only as a first step. If this is done, we can see
>> in
>>> a
>>>>>>> second step, how to add headers at DSL level.
>>>>>>>
>>>>>>> WDYT about this proposal?
>>>>>>>
>>>>>>> If you agree, please update the JIRA and KIP accordingly. Note,
>>> that
>>>> we
>>>>>>> have two JIRA that are duplicates atm. We can scope them
>>> accordingly:
>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>> Thanks for your feedback!
>>>>>>>>
>>>>>>>> 1. I was adding headers to KeyValue to support groupBy, but I
>>> think
>>>>> it
>>>>>> is
>>>>>>>> not necessary. It should be enough with mapping headers to
>>>> key/value
>>>>>> and
>>>>>>>> then group using current KeyValue structure.
>>>>>>>>
>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
>>>> structure,
>>>>>>> hence
>>>>>>>> considering headers as part of stateful operations will not fit
>>> in
>>>>> this
>>>>>>>> approach and increase complexity (I cannot think in a use-case
>>> that
>>>>>> need
>>>>>>>> this).
>>>>>>>>
>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
>>>>>>>>
>>>>>>>> Probably I rush a bit proposing this change, I was not aware of
>>>>> KIP-159
>>>>>>> or
>>>>>>>> KAFKA-5632.
>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add Headers to
>>>>>>>> RecordContext will be enough, but I'm not sure about the scope
>> of
>>>>>>> KIP-159.
>>>>>>>> If it includes stateful operations will be difficult to
>>> implemented
>>>>> as
>>>>>>>> stated in 2.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Jorge.
>>>>>>>>
>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
>>>>>>> matth...@confluent.io>)
>>>>>>>> escribió:
>>>>>>>>
>>>>>>>>> Thanks for the KIP Jorge,
>>>>>>>>>
>>>>>>>>> As Bill pointed out already, we should be careful with adding
>>> new
>>>>>>>>> overloads as this contradicts the work done via KIP-182.
>>>>>>>>>
>>>>>>>>> This KIP also seems to be related to KIP-149 and KIP-159. Are
>>> you
>>>>>> aware
>>>>>>>>> of them? Both have quite long DISCUSS threads, but it might be
>>>> worth
>>>>>>>>> browsing through them.
>>>>>>>>>
>>>>>>>>> A few further questions:
>>>>>>>>>
>>>>>>>>>  - why do you want to add the headers to `KeyValue`? I am not
>>> sure
>>>>> if
>>>>>> we
>>>>>>>>> should consider headers as optional metadata and add it to
>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only
>>>>>>>>
>>>>>>>>
>>>>>>>>>  - You only include stateless single-record transformations at
>>> the
>>>>> DSL
>>>>>>>>> level. Do you suggest that all other operator just drop
>> headers
>>> on
>>>>> the
>>>>>>>>> floor?
>>>>>>>>>
>>>>>>>>>  - Why do you only want to put headers into in-memory and
>> cache
>>>> but
>>>>>> not
>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO, all
>>>> stores
>>>>>>>>> should behave the same at DSL level.
>>>>>>>>>    -> if we store the headers in the state stores, what is the
>>>>> upgrade
>>>>>>>>> path?
>>>>>>>>>
>>>>>>>>>  - Why do we need to store record header in state in the first
>>>>> place,
>>>>>> if
>>>>>>>>> we exclude stateful operator at DSL level?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What is the motivation for the "border lines" you choose?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
>>>>>>>>>> Jorge,
>>>>>>>>>>
>>>>>>>>>> Thanks for the KIP, I know this is a feature others in the
>>>>> community
>>>>>>> have
>>>>>>>>>> been interested in getting into Kafka Streams.
>>>>>>>>>>
>>>>>>>>>> I took a quick pass over it, and I have one initial question.
>>>>>>>>>>
>>>>>>>>>> We recently reduced overloads with KIP-182, and in this KIP
>> we
>>>> are
>>>>>>>>>> increasing them again.
>>>>>>>>>>
>>>>>>>>>> I can see from the KIP why they are necessary, but I'm
>>> wondering
>>>> if
>>>>>>> there
>>>>>>>>>> is something else we can do to cut down on the overloads
>>>>>> introduced.  I
>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to think
>>> about
>>>>> it
>>>>>>> some
>>>>>>>>>> more, but I wanted to put the thought out there.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Bill
>>>>>>>>>>
>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate
>> Otoya <
>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I have created a KIP to add Record Headers support to Kafka
>>>>> Streams
>>>>>>> API:
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The main goal is to be able to use headers to filter, map
>> and
>>>>>> process
>>>>>>>>>>> records as streams. Stateful processing (joins, windows) are
>>> not
>>>>>>>>>>> considered.
>>>>>>>>>>>
>>>>>>>>>>> Proposed changes/Draft:
>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
>>>>>> streams-headers
>>>>>>>>>>>
>>>>>>>>>>> Feedback and suggestions are more than welcome.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Jorge.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to