[ 
https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16829636#comment-16829636
 ] 

John Roesler commented on KAFKA-8289:
-------------------------------------

Hi [~xiaoxiaoliner],

Thanks for the bug report; I'm sorry to hear this is causing you trouble.

For the record, we do expect Suppress to work just as well for SessionWindows 
as for TimeWindows. Your understanding is correct, the suppression should be 
waiting until the session window is closed before emitting the results, which 
means you should see a unique start time per key.

I looked into the code, and suppression does appear to be misconfigured for 
session window aggregations. Currently, it is waiting until time `window.end + 
grace` before emitting, but session windows are special, since the window end 
is provisional... if more data arrives before the "inactivity gap" passes, the 
window end moves to the latest record's timestamp. Thus, suppression should be 
waiting until `window.end + inactivityGap + grace` before emitting, to be sure 
that it really does wait until the window is closed (i.e., until it can never 
emit more updates).

I looked into our tests, and we didn't detect this error because the session 
window tests unfortunately set the grace period equal to the inactivity gap. 
After setting the grace period to a smaller value, I was able to reproduce the 
reported case. I am preparing a PR to fix this bug. I've marked this ticket as 
a blocker for the next release, and also for the next 2.1 and 2.2 bugfix 
releases.

While debugging this issue, I noticed that the grace period calculation for 
session window aggregations is apparently off by one. Setting the grace period 
to 0ms results in all output from the aggregation being dropped. With the 
current code, if you want to implement a super-strict grace period, you need to 
use 1ms. Pending code review, I plan to include this fix in the same PR.

Within my theories above, you should be able to employ a workaround of using a 
grace period equal to (or greater than) the inactivity gap. This isn't 
precisely the same logic you encoded in your application above, and (in the 
presence of late-arriving data) you may observe that the "final results" stream 
differs from the "raw results" stream, but you *should* at least see unique 
results for your aggregations.

> KTable<Windowed<String>, Long>  can't be suppressed
> ---------------------------------------------------
>
>                 Key: KAFKA-8289
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8289
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.2.0, 2.1.1
>         Environment: Broker on a Linux, stream app on my win10 laptop. 
> I add one row log.message.timestamp.type=LogAppendTime to my broker's 
> server.properties. stream app all default config.
>            Reporter: Xiaolin Jia
>            Assignee: John Roesler
>            Priority: Blocker
>             Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> I write a simple stream app followed official developer guide [Stream 
> DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]].
>  but I got more than one [Window Final 
> Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]
>  from a session time window.
> time ticker A -> (4,A) / 25s,
> time ticker B -> (4, B) / 25s  all send to the same topic 
> below is my stream app code 
> {code:java}
> kstreams[0]
> .peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
> .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
> .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
> .count()
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), 
> k.key(), v));
> {code}
> {{here is my log print}}
> {noformat}
> 2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556106587744, 
> endMs=1556107129191},k=A,v=20
> 2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107426409},k=B,v=9
> 2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107445012},k=A,v=1
> 2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107451397},k=B,v=10
> 2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107469935},k=A,v=2
> 2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:05:19.599  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:05:20.045  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107476398},k=B,v=11
> 2019-04-24 20:05:20.047  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107501398},k=B,v=12
> 2019-04-24 20:05:26.075  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:05:44.598  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:05:50.399  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107519930},k=A,v=4
> 2019-04-24 20:05:50.400  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107526405},k=B,v=13
> 2019-04-24 20:05:51.067  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:06:09.595  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:06:16.089  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:06:20.765  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107544929},k=A,v=5
> 2019-04-24 20:06:20.767  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107569926},k=A,v=6
> 2019-04-24 20:06:34.595  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:06:41.063  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:06:51.081  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107576415},k=B,v=15
> 2019-04-24 20:06:51.082  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107594925},k=A,v=7
> 2019-04-24 20:06:59.607  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:07:06.072  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:07:21.440  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107601391},k=B,v=16
> 2019-04-24 20:07:21.441  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107619935},k=A,v=8
> 2019-04-24 20:07:24.596  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:07:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:07:49.608  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:07:51.775  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107626420},k=B,v=17
> 2019-04-24 20:07:51.777  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107651396},k=B,v=18
> 2019-04-24 20:07:56.064  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:08:14.591  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:08:21.066  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:08:22.125  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107669943},k=A,v=10
> 2019-04-24 20:08:22.126  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107694921},k=A,v=11
> 2019-04-24 20:08:39.619  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:08:46.067  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:08:52.457  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107701397},k=B,v=20
> 2019-04-24 20:08:52.458  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107719949},k=A,v=12
> 2019-04-24 20:09:04.599  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=A
> 2019-04-24 20:09:11.066  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : --> ping, k=4,v=B
> 2019-04-24 20:09:22.794  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107226473, 
> endMs=1556107726398},k=B,v=21   
> 2019-04-24 20:09:22.796  INFO --- [-StreamThread-1] c.g.k.AppStreams          
>               : window=Window{startMs=1556107445012, 
> endMs=1556107744928},k=A,v=13{noformat}
>  Can‘t a [SessionWindowedKStream] be suppressed after count operation? It 
> seems the latest type record produce a previous type record 'Window Final 
> Results'. I just want get exactly one [Window Final 
> Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31].
>  
> First i just start one time ticker, log print seems ok, when I start the 
> second, then window info print log appeared.
> My source stream record rate is a same v record / 25s, gap of inactivity is 
> 100s, 25<100. I shouldn't got so many window info print log. Session window 
> close and then reopened? my grace is 20 millisecond, it should not be ah. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to