mjsax commented on code in PR #14523: URL: https://github.com/apache/kafka/pull/14523#discussion_r1357188427
########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) { final SessionStore<Integer, Integer> stateStore = context().getStateStore(sessionStoreStoreBuilder.name()); stateStore.put( - new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)), + new Windowed<>(record.key(), new SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())), Review Comment: I think it should be `new SessionWindow(record.timestamp(), record.timestamp())` > but their session window size seem to become 0 Yes, the size would become `1` (note, for sessions window, both upper and lower bound are inclusive in contrast to Tumbling/Hopping window). For session windows, there is no fixed size, but the "inactivity gap" parameter (the test only re-uses `WINDOW_SIZE` variable as "gap" -- this might be a little bit confusing). The size of the created windows depend on the data -- in the current test setup, records are apart from each other more than the "gap" and thus each record creates it's own session, and thus each session has size `1` -- only if two (or more) records fall into the same session (ie, their timestamp are close enough to each other, ie, `<= gap` the session size would become larger). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org