John Roesler created KAFKA-7806:
-----------------------------------
Summary: Windowed Aggregations should wrap default key serde if
none is specified
Key: KAFKA-7806
URL: https://issues.apache.org/jira/browse/KAFKA-7806
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: John Roesler
In Streams, windowing a stream by either time or session windows causes the
stream's keys to be transformed from `K` to `Windowed<K>`.
Since this is a well defined transition, it's not necessary for developers to
explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which
already knows the key serde (`Serde<K>`) automatically wraps it in case it's
needed by downstream operators.
However, this automatic wrapping only takes place if the key serde has been
explicitly provided in the topology. If the topology relies on the
`default.key.serde` configuration, no wrapping takes place, and downstream
operators will encounter a ClassCastException trying to cast a `Windowed` (the
windowed key) to whatever type the default serde handles (which is the key
wrapped inside the windowed key).
Specifically, they key serde forwarding logic is:
in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
`materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null`
and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
`materializedInternal.keySerde() != null ? new
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
This pattern of not "solidifying" the default key serde is common in Streams.
Not all operators need a serde, and the default serde may not be applicable to
all operators. So, it would be a mistake to arbitrary operators to grab the
default serde and pass it downstream as if it had been explicitly set.
However, in this case specifically, all windowed aggregations are stateful, so
if we don't have an explicit key serde at this point, we know that we have used
the default serde in the window store. If the default serde were incorrect, an
exception would be thrown by the windowed aggregation itself. So it actually is
safe to wrap the default serde in a windowed serde and pass it downstream,
which would result in a better development experience.
Unfortunately, the default serde is set via config, but the windowed serde
wrapping happens during DSL building, when the config is not generally
available. Therefore, we would need a special windowed serde wrapper that
signals that it wraps the default serde, which would be fully resolved during
operators' init call.
For example, something of this nature:
`materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) :
FullTimeWindowedSerde.wrapDefault(windows.size())`
etc.
Complicating the situation slightly, all the windowed serializers and
deserializers will resolve a runtime inner class using
`default.windowed.key.serde.inner` if given a null inner serde to wrap.
However, at this point in the topology build, we do know that the windowed
aggregation has specifically used the `default.key.serde`, not the
`default.windowed.key.serde.inner` to persist its state to the window store,
therefore, it should be correct to wrap the default key serde specifically and
not use the `default.windowed.key.serde.inner`.
In addition to fixing this for TimeWindowed and SessionWindowed streams, we
need to have good test coverage of the new code. There is clearly a blind spot
in the tests, or we would have noticed this sooner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)