[
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747
]
Sören Henning commented on KAFKA-9649:
--
Hi, sorry for the late response, vacation came up... :)
In our case, we observed this issue when grouping a windowed KTable by a new
key for subsequent aggregation:
{noformat}
KTable, V> myTable = //...
KGroupedTable, V> = myTable
.groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(),
k.window()), v),
Grouped.with(
new WindowedSerdes.TimeWindowedSerde<>(
myTableXYAttributeSerde,
myTableWindowSize),
myTableValueSerde));
{noformat}
Here, we have a windowed KTable with keys of type {{K}} and want to group it by
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.
When not passing {{myTableWindowSize}} to the
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which
actually are of type {{Windowed}}, are not assigned the correct end
timestamps. This issue does not immediately becomes apparent, only a log
message is produced:
{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window
end time was truncated to Long.MAX
{noformat}
> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Reporter: Sören Henning
>Priority: Major
>
> The API of the
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
> promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
> which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
> this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}}
> constructor or at least to warn when using it (if required for backwards
> compatiblity). The ideal solution of course would be to get the window size
> from some externally provided context. However, I expect this to be difficult
> to realize. Same applies also the {{TimeWindowedDeserializer(final
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time
> declarations use {{Duration}} s instead of long-encoded milliseconds, I
> suggest to allow specifying window sizes with a {{Duration}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)