mjsax commented on code in PR #14523:
URL: https://github.com/apache/kafka/pull/14523#discussion_r1357188427
##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) {
final SessionStore<Integer, Integer> stateStore =
context().getStateStore(sessionStoreStoreBuilder.name());
stateStore.put(
- new Windowed<>(record.key(), new
SessionWindow(WINDOW_START, WINDOW_START)),
+ new Windowed<>(record.key(), new
SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) *
WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) *
WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())),
Review Comment:
I think it should be `new SessionWindow(record.timestamp(),
record.timestamp())`
> but their session window size seem to become 0
Yes, the size would become `1` (note, for sessions window, both upper and
lower bound are inclusive in contrast to Tumbling/Hopping window). For session
windows, there is no fixed size, but the "inactivity gap" parameter (the test
only re-uses `WINDOW_SIZE` variable as "gap" -- this might be a little bit
confusing). The size of the created windows depend on the data -- in the
current test setup, records are apart from each other more than the "gap" and
thus each record creates it's own session, and thus each session has size `1`
-- only if two (or more) records fall into the same session (ie, their
timestamp are close enough to each other, ie, `<= gap` the session size would
become larger).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]