[jira] [Commented] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-31 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747
 ] 

Sören Henning commented on KAFKA-9649:
--

Hi, sorry for the late response, vacation came up... :) 

In our case, we observed this issue when grouping a windowed KTable by a new 
key for subsequent aggregation:

{noformat}
KTable, V> myTable = //...
KGroupedTable, V> =  myTable
  .groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), 
k.window()), v),
Grouped.with(
  new WindowedSerdes.TimeWindowedSerde<>(
myTableXYAttributeSerde,
myTableWindowSize),
  myTableValueSerde));
{noformat}

Here, we have a windowed KTable with keys of type {{K}} and want to group it by 
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.

When not passing {{myTableWindowSize}} to the 
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which 
actually are of type {{Windowed}}, are not assigned the correct end 
timestamps. This issue does not immediately becomes apparent, only a log 
message is produced:

{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window 
end time was truncated to Long.MAX
{noformat}

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Priority: Major
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-20 Thread Leah Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181472#comment-17181472
 ] 

Leah Thomas commented on KAFKA-9649:


Hey Sören,

I started a KIP to take care of this. Would you be able to elaborate on where 
the runtime issues are coming from? Is it because the default value is still 
Long.INT_MAX from the serde you initialize in your app?

Thanks!

Leah

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Priority: Major
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)