[ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747
 ] 

Sören Henning commented on KAFKA-9649:
--------------------------------------

Hi, sorry for the late response, vacation came up... :) 

In our case, we observed this issue when grouping a windowed KTable by a new 
key for subsequent aggregation:

{noformat}
KTable<Windowed<K>, V> myTable = //...
KGroupedTable<Windowed<KNew>, V> =  myTable
  .groupBy(
    (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), 
k.window()), v),
    Grouped.with(
      new WindowedSerdes.TimeWindowedSerde<>(
        myTableXYAttributeSerde,
        myTableWindowSize),
      myTableValueSerde));
{noformat}

Here, we have a windowed KTable with keys of type {{K}} and want to group it by 
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.

When not passing {{myTableWindowSize}} to the 
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which 
actually are of type {{Windowed<KNew>}}, are not assigned the correct end 
timestamps. This issue does not immediately becomes apparent, only a log 
message is produced:

{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window 
end time was truncated to Long.MAX
{noformat}

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-9649
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9649
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Sören Henning
>            Priority: Major
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde<T> inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer<T> inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde<T> inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer<T> inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to