[ https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747 ]
Sören Henning edited comment on KAFKA-9649 at 8/31/20, 1:49 PM: ---------------------------------------------------------------- Hi, sorry for the late response, vacation came up... :) In our case, we observed this issue when grouping a windowed KTable by a new key for a subsequent aggregation: {noformat} KTable<Windowed<K>, V> myTable = //... KGroupedTable<Windowed<KNew>, V> = myTable .groupBy( (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), k.window()), v), Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( myTableXYAttributeSerde, myTableWindowSize), myTableValueSerde)); {noformat} Here, we have a windowed KTable with keys of type {{K}} and want to group it by a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}. When not passing {{myTableWindowSize}} to the {{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which actually are of type {{Windowed<KNew>}}, are not assigned the correct end timestamps. This issue does not immediately becomes apparent, only a log message is produced: {noformat} WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window end time was truncated to Long.MAX {noformat} was (Author: soerenhenning): Hi, sorry for the late response, vacation came up... :) In our case, we observed this issue when grouping a windowed KTable by a new key for subsequent aggregation: {noformat} KTable<Windowed<K>, V> myTable = //... KGroupedTable<Windowed<KNew>, V> = myTable .groupBy( (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), k.window()), v), Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( myTableXYAttributeSerde, myTableWindowSize), myTableValueSerde)); {noformat} Here, we have a windowed KTable with keys of type {{K}} and want to group it by a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}. When not passing {{myTableWindowSize}} to the {{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which actually are of type {{Windowed<KNew>}}, are not assigned the correct end timestamps. This issue does not immediately becomes apparent, only a log message is produced: {noformat} WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window end time was truncated to Long.MAX {noformat} > Remove/Warn on use of TimeWindowedSerde with no specified window size > --------------------------------------------------------------------- > > Key: KAFKA-9649 > URL: https://issues.apache.org/jira/browse/KAFKA-9649 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Sören Henning > Priority: Major > > The API of the > [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java] > promotes its construction without specifying a window size: > {noformat} > public TimeWindowedSerde(final Serde<T> inner) > {noformat} > While code using this constructor looks absolutely clean, it leads to fatal > errors at runtime, which turned out to be very hard to discover. > The reason for these error can be found in the construction of the > [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java], > which is created via: > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 > public TimeWindowedDeserializer(final Deserializer<T> inner) { > this(inner, Long.MAX_VALUE); > } > {noformat} > The TODO comment suggests that this issue is (or at least was) already known. > We suggest to either remove the {{TimeWindowedSerde(final Serde<T> inner)}} > constructor or at least to warn when using it (if required for backwards > compatiblity). The ideal solution of course would be to get the window size > from some externally provided context. However, I expect this to be difficult > to realize. Same applies also the {{TimeWindowedDeserializer(final > Deserializer<T> inner)}} constructor. > A further minor suggestion in this context: As now most Kafka Streams time > declarations use {{Duration}} s instead of long-encoded milliseconds, I > suggest to allow specifying window sizes with a {{Duration}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)