[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281834#comment-17281834
 ] 

Marco Lotz edited comment on KAFKA-9524 at 2/9/21, 7:30 PM:
------------------------------------------------------------

I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be 
negative.");
    }
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be 
negative.");
    }
    final long effectiveMaintainDurationMs = Math.max(sizeMs + 
afterWindowEndMs, maintainDurationMs);
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the Math.max can be simplified to a 
simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 


was (Author: marcolotz):
I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be 
negative.");
    }
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be 
negative.");
    }
    final long effectiveMaintainDurationMs = Math.max(sizeMs + 
afterWindowEndMs, maintainDurationMs);
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the ternary operator can be 
simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 

> Default window retention does not consider grace period
> -------------------------------------------------------
>
>                 Key: KAFKA-9524
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9524
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Michael Bingham
>            Assignee: Marco Lotz
>            Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-0000000002 must be no smaller 
> than its window size plus the grace period. Got size=[1728000000], 
> grace=[300000], retention=[1728000000]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to