[ https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16829636#comment-16829636 ]
John Roesler commented on KAFKA-8289: ------------------------------------- Hi [~xiaoxiaoliner], Thanks for the bug report; I'm sorry to hear this is causing you trouble. For the record, we do expect Suppress to work just as well for SessionWindows as for TimeWindows. Your understanding is correct, the suppression should be waiting until the session window is closed before emitting the results, which means you should see a unique start time per key. I looked into the code, and suppression does appear to be misconfigured for session window aggregations. Currently, it is waiting until time `window.end + grace` before emitting, but session windows are special, since the window end is provisional... if more data arrives before the "inactivity gap" passes, the window end moves to the latest record's timestamp. Thus, suppression should be waiting until `window.end + inactivityGap + grace` before emitting, to be sure that it really does wait until the window is closed (i.e., until it can never emit more updates). I looked into our tests, and we didn't detect this error because the session window tests unfortunately set the grace period equal to the inactivity gap. After setting the grace period to a smaller value, I was able to reproduce the reported case. I am preparing a PR to fix this bug. I've marked this ticket as a blocker for the next release, and also for the next 2.1 and 2.2 bugfix releases. While debugging this issue, I noticed that the grace period calculation for session window aggregations is apparently off by one. Setting the grace period to 0ms results in all output from the aggregation being dropped. With the current code, if you want to implement a super-strict grace period, you need to use 1ms. Pending code review, I plan to include this fix in the same PR. Within my theories above, you should be able to employ a workaround of using a grace period equal to (or greater than) the inactivity gap. This isn't precisely the same logic you encoded in your application above, and (in the presence of late-arriving data) you may observe that the "final results" stream differs from the "raw results" stream, but you *should* at least see unique results for your aggregations. > KTable<Windowed<String>, Long> can't be suppressed > --------------------------------------------------- > > Key: KAFKA-8289 > URL: https://issues.apache.org/jira/browse/KAFKA-8289 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0, 2.2.0, 2.1.1 > Environment: Broker on a Linux, stream app on my win10 laptop. > I add one row log.message.timestamp.type=LogAppendTime to my broker's > server.properties. stream app all default config. > Reporter: Xiaolin Jia > Assignee: John Roesler > Priority: Blocker > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > I write a simple stream app followed official developer guide [Stream > DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]]. > but I got more than one [Window Final > Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31] > from a session time window. > time ticker A -> (4,A) / 25s, > time ticker B -> (4, B) / 25s all send to the same topic > below is my stream app code > {code:java} > kstreams[0] > .peek((k, v) -> log.info("--> ping, k={},v={}", k, v)) > .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String())) > .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20))) > .count() > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) > .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), > k.key(), v)); > {code} > {{here is my log print}} > {noformat} > 2019-04-24 20:00:26.142 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:00:47.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556106587744, > endMs=1556107129191},k=A,v=20 > 2019-04-24 20:00:51.071 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:01:16.065 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:01:41.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:06.069 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:31.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:56.208 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:03:21.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:03:46.078 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:04.684 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:04:11.069 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:19.371 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107426409},k=B,v=9 > 2019-04-24 20:04:19.372 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107445012},k=A,v=1 > 2019-04-24 20:04:29.604 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:04:36.067 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:49.715 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107451397},k=B,v=10 > 2019-04-24 20:04:49.716 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107469935},k=A,v=2 > 2019-04-24 20:04:54.593 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:05:01.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:05:19.599 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:05:20.045 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107476398},k=B,v=11 > 2019-04-24 20:05:20.047 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107501398},k=B,v=12 > 2019-04-24 20:05:26.075 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:05:44.598 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:05:50.399 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107519930},k=A,v=4 > 2019-04-24 20:05:50.400 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107526405},k=B,v=13 > 2019-04-24 20:05:51.067 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:06:09.595 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:06:16.089 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:06:20.765 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107544929},k=A,v=5 > 2019-04-24 20:06:20.767 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107569926},k=A,v=6 > 2019-04-24 20:06:34.595 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:06:41.063 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:06:51.081 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107576415},k=B,v=15 > 2019-04-24 20:06:51.082 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107594925},k=A,v=7 > 2019-04-24 20:06:59.607 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:07:06.072 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:07:21.440 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107601391},k=B,v=16 > 2019-04-24 20:07:21.441 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107619935},k=A,v=8 > 2019-04-24 20:07:24.596 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:07:31.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:07:49.608 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:07:51.775 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107626420},k=B,v=17 > 2019-04-24 20:07:51.777 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107651396},k=B,v=18 > 2019-04-24 20:07:56.064 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:08:14.591 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:08:21.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:08:22.125 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107669943},k=A,v=10 > 2019-04-24 20:08:22.126 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107694921},k=A,v=11 > 2019-04-24 20:08:39.619 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:08:46.067 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:08:52.457 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107701397},k=B,v=20 > 2019-04-24 20:08:52.458 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107719949},k=A,v=12 > 2019-04-24 20:09:04.599 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:09:11.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:09:22.794 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107726398},k=B,v=21 > 2019-04-24 20:09:22.796 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107744928},k=A,v=13{noformat} > Can‘t a [SessionWindowedKStream] be suppressed after count operation? It > seems the latest type record produce a previous type record 'Window Final > Results'. I just want get exactly one [Window Final > Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]. > > First i just start one time ticker, log print seems ok, when I start the > second, then window info print log appeared. > My source stream record rate is a same v record / 25s, gap of inactivity is > 100s, 25<100. I shouldn't got so many window info print log. Session window > close and then reopened? my grace is 20 millisecond, it should not be ah. -- This message was sent by Atlassian JIRA (v7.6.3#76005)