mjsax commented on code in PR #14523: URL: https://github.com/apache/kafka/pull/14523#discussion_r1355991839
########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) { final SessionStore<Integer, Integer> stateStore = context().getStateStore(sessionStoreStoreBuilder.name()); stateStore.put( - new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)), + new Windowed<>(record.key(), new SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())), Review Comment: We use a session window gap of `WINDOW_SIZE` -- this means that every record falls into it's own session, and `session-start = session-end = record.ts`. ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -880,25 +888,113 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract extractor, mkSet() ); + + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + extractor, + mkSet(0) + ); + + shouldHandleWindowKeyQuery( + 1, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(4).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(11).toMillis()), + extractor, + mkSet(1) + ); + + shouldHandleWindowKeyQuery( + 2, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(12).toMillis()), + extractor, + mkSet(2) + ); + + shouldHandleWindowKeyQuery( + 3, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(13).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()), + extractor, + mkSet(3) + ); } private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> extractor) { final long windowSize = WINDOW_SIZE.toMillis(); final long windowStart = (RECORD_TIME / windowSize) * windowSize; + // miss the window start + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart - 1), + Instant.ofEpochMilli(windowStart - 1), + extractor, + mkSet() + ); + + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart), + extractor, + mkSet(0) + ); + shouldHandleWindowRangeQuery( Instant.ofEpochMilli(windowStart), - Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), extractor, - mkSet(0, 1, 2, 3) + mkSet(0, 1) ); - // miss the window start shouldHandleWindowRangeQuery( Review Comment: Similar to above -- it's not obvious why you picked the time bounds for the queries. Can you explain? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -847,11 +847,11 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract // tightest possible start range shouldHandleWindowKeyQuery( - 2, + 0, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), extractor, - mkSet(2) + mkSet(0) ); // miss the window start range Review Comment: The key below should be updated to `0`? Otherwise we don't miss the window of the key be 1 ms as intended by the test? ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -880,25 +888,113 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract extractor, mkSet() ); + + shouldHandleWindowKeyQuery( Review Comment: For this and the other added queries. How do you pick the query search bounds? It's not obvious to me. ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ########## @@ -863,6 +863,14 @@ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extract mkSet() ); + shouldHandleWindowKeyQuery( + 2, Review Comment: nit: fix indention -- should only be 4 spaces (similar below) What is the indent of adding this test case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org