mjsax commented on code in PR #14523:
URL: https://github.com/apache/kafka/pull/14523#discussion_r1357188427


##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) {
                 final SessionStore<Integer, Integer> stateStore =
                     context().getStateStore(sessionStoreStoreBuilder.name());
                 stateStore.put(
-                    new Windowed<>(record.key(), new 
SessionWindow(WINDOW_START, WINDOW_START)),
+                    new Windowed<>(record.key(), new 
SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * 
WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * 
WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())),

Review Comment:
   I think it should be `new SessionWindow(record.timestamp(), 
record.timestamp())`
   
   > but their session window size seem to become 0
   
   Yes, the size would become `1` (note, for sessions window, both upper and 
lower bound are inclusive in contrast to Tumbling/Hopping window). For session 
windows, there is no fixed size, but the "inactivity gap" parameter (the test 
only re-uses `WINDOW_SIZE` variable as "gap" -- this might be a little bit 
confusing). The size of the created windows depend on the data -- in the 
current test setup, records are apart from each other more than the "gap" and 
thus each record creates it's own session, and thus each session has size `1` 
-- only if two (or more) records fall into the same session (ie, their 
timestamp are close enough to each other, ie, `<= gap` the session size would 
become larger).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to