[ 
https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739701#comment-16739701
 ] 

John Roesler edited comment on KAFKA-7806 at 1/10/19 7:16 PM:
--------------------------------------------------------------

Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John


was (Author: vvcephei):
Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through. I'll leave my comments on the mailing list.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John

> Windowed Aggregations should wrap default key serde if none is specified
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7806
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7806
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> In Streams, windowing a stream by either time or session windows causes the 
> stream's keys to be transformed from `K` to `Windowed<K>`.
> Since this is a well defined transition, it's not necessary for developers to 
> explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which 
> already knows the key serde (`Serde<K>`) automatically wraps it in case it's 
> needed by downstream operators.
> However, this automatic wrapping only takes place if the key serde has been 
> explicitly provided in the topology. If the topology relies on the 
> `default.key.serde` configuration, no wrapping takes place, and downstream 
> operators will encounter a ClassCastException trying to cast a `Windowed` 
> (the windowed key) to whatever type the default serde handles (which is the 
> key wrapped inside the windowed key).
> Specifically, they key serde forwarding logic is:
> in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> null`
> and in 
> `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
>  
> This pattern of not "solidifying" the default key serde is common in Streams. 
> Not all operators need a serde, and the default serde may not be applicable 
> to all operators. So, it would be a mistake to arbitrary operators to grab 
> the default serde and pass it downstream as if it had been explicitly set.
>  
> However, in this case specifically, all windowed aggregations are stateful, 
> so if we don't have an explicit key serde at this point, we know that we have 
> used the default serde in the window store. If the default serde were 
> incorrect, an exception would be thrown by the windowed aggregation itself. 
> So it actually is safe to wrap the default serde in a windowed serde and pass 
> it downstream, which would result in a better development experience.
>  
> Unfortunately, the default serde is set via config, but the windowed serde 
> wrapping happens during DSL building, when the config is not generally 
> available. Therefore, we would need a special windowed serde wrapper that 
> signals that it wraps the default serde, which would be fully resolved during 
> operators' init call.
> For example, something of this nature:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> FullTimeWindowedSerde.wrapDefault(windows.size())`
> etc.
>  
> Complicating the situation slightly, all the windowed serializers and 
> deserializers will resolve a runtime inner class using 
> `default.windowed.key.serde.inner` if given a null inner serde to wrap. 
> However, at this point in the topology build, we do know that the windowed 
> aggregation has specifically used the `default.key.serde`, not the 
> `default.windowed.key.serde.inner` to persist its state to the window store, 
> therefore, it should be correct to wrap the default key serde specifically 
> and not use the `default.windowed.key.serde.inner`.
>  
> In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
> need to have good test coverage of the new code. There is clearly a blind 
> spot in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to