[ 
https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738562#comment-16738562
 ] 

Boyang Chen commented on KAFKA-7806:
------------------------------------

Thanks John for bringing this up! FYI, [~shnguyen] has contributed a 
[KIP-393|https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic]
 which should be helpful here to wrap the default serde.

> Windowed Aggregations should wrap default key serde if none is specified
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7806
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7806
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> In Streams, windowing a stream by either time or session windows causes the 
> stream's keys to be transformed from `K` to `Windowed<K>`.
> Since this is a well defined transition, it's not necessary for developers to 
> explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which 
> already knows the key serde (`Serde<K>`) automatically wraps it in case it's 
> needed by downstream operators.
> However, this automatic wrapping only takes place if the key serde has been 
> explicitly provided in the topology. If the topology relies on the 
> `default.key.serde` configuration, no wrapping takes place, and downstream 
> operators will encounter a ClassCastException trying to cast a `Windowed` 
> (the windowed key) to whatever type the default serde handles (which is the 
> key wrapped inside the windowed key).
> Specifically, they key serde forwarding logic is:
> in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> null`
> and in 
> `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
>  
> This pattern of not "solidifying" the default key serde is common in Streams. 
> Not all operators need a serde, and the default serde may not be applicable 
> to all operators. So, it would be a mistake to arbitrary operators to grab 
> the default serde and pass it downstream as if it had been explicitly set.
>  
> However, in this case specifically, all windowed aggregations are stateful, 
> so if we don't have an explicit key serde at this point, we know that we have 
> used the default serde in the window store. If the default serde were 
> incorrect, an exception would be thrown by the windowed aggregation itself. 
> So it actually is safe to wrap the default serde in a windowed serde and pass 
> it downstream, which would result in a better development experience.
>  
> Unfortunately, the default serde is set via config, but the windowed serde 
> wrapping happens during DSL building, when the config is not generally 
> available. Therefore, we would need a special windowed serde wrapper that 
> signals that it wraps the default serde, which would be fully resolved during 
> operators' init call.
> For example, something of this nature:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> FullTimeWindowedSerde.wrapDefault(windows.size())`
> etc.
>  
> Complicating the situation slightly, all the windowed serializers and 
> deserializers will resolve a runtime inner class using 
> `default.windowed.key.serde.inner` if given a null inner serde to wrap. 
> However, at this point in the topology build, we do know that the windowed 
> aggregation has specifically used the `default.key.serde`, not the 
> `default.windowed.key.serde.inner` to persist its state to the window store, 
> therefore, it should be correct to wrap the default key serde specifically 
> and not use the `default.windowed.key.serde.inner`.
>  
> In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
> need to have good test coverage of the new code. There is clearly a blind 
> spot in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to