[ https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738562#comment-16738562 ]
Boyang Chen commented on KAFKA-7806: ------------------------------------ Thanks John for bringing this up! FYI, [~shnguyen] has contributed a [KIP-393|https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic] which should be helpful here to wrap the default serde. > Windowed Aggregations should wrap default key serde if none is specified > ------------------------------------------------------------------------ > > Key: KAFKA-7806 > URL: https://issues.apache.org/jira/browse/KAFKA-7806 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Priority: Major > > In Streams, windowing a stream by either time or session windows causes the > stream's keys to be transformed from `K` to `Windowed<K>`. > Since this is a well defined transition, it's not necessary for developers to > explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which > already knows the key serde (`Serde<K>`) automatically wraps it in case it's > needed by downstream operators. > However, this automatic wrapping only takes place if the key serde has been > explicitly provided in the topology. If the topology relies on the > `default.key.serde` configuration, no wrapping takes place, and downstream > operators will encounter a ClassCastException trying to cast a `Windowed` > (the windowed key) to whatever type the default serde handles (which is the > key wrapped inside the windowed key). > Specifically, they key serde forwarding logic is: > in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`: > `materializedInternal.keySerde() != null ? new > FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : > null` > and in > `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`: > `materializedInternal.keySerde() != null ? new > WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null` > > This pattern of not "solidifying" the default key serde is common in Streams. > Not all operators need a serde, and the default serde may not be applicable > to all operators. So, it would be a mistake to arbitrary operators to grab > the default serde and pass it downstream as if it had been explicitly set. > > However, in this case specifically, all windowed aggregations are stateful, > so if we don't have an explicit key serde at this point, we know that we have > used the default serde in the window store. If the default serde were > incorrect, an exception would be thrown by the windowed aggregation itself. > So it actually is safe to wrap the default serde in a windowed serde and pass > it downstream, which would result in a better development experience. > > Unfortunately, the default serde is set via config, but the windowed serde > wrapping happens during DSL building, when the config is not generally > available. Therefore, we would need a special windowed serde wrapper that > signals that it wraps the default serde, which would be fully resolved during > operators' init call. > For example, something of this nature: > `materializedInternal.keySerde() != null ? new > FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : > FullTimeWindowedSerde.wrapDefault(windows.size())` > etc. > > Complicating the situation slightly, all the windowed serializers and > deserializers will resolve a runtime inner class using > `default.windowed.key.serde.inner` if given a null inner serde to wrap. > However, at this point in the topology build, we do know that the windowed > aggregation has specifically used the `default.key.serde`, not the > `default.windowed.key.serde.inner` to persist its state to the window store, > therefore, it should be correct to wrap the default key serde specifically > and not use the `default.windowed.key.serde.inner`. > > In addition to fixing this for TimeWindowed and SessionWindowed streams, we > need to have good test coverage of the new code. There is clearly a blind > spot in the tests, or we would have noticed this sooner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)