mjsax commented on code in PR #14523:
URL: https://github.com/apache/kafka/pull/14523#discussion_r1355991839


##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -716,7 +716,7 @@ public void process(final Record<Integer, Integer> record) {
                 final SessionStore<Integer, Integer> stateStore =
                     context().getStateStore(sessionStoreStoreBuilder.name());
                 stateStore.put(
-                    new Windowed<>(record.key(), new 
SessionWindow(WINDOW_START, WINDOW_START)),
+                    new Windowed<>(record.key(), new 
SessionWindow((record.timestamp() / WINDOW_SIZE.toMillis()) * 
WINDOW_SIZE.toMillis(), (record.timestamp() / WINDOW_SIZE.toMillis()) * 
WINDOW_SIZE.toMillis() + WINDOW_SIZE.toMillis())),

Review Comment:
   We use a session window gap of `WINDOW_SIZE` -- this means that every record 
falls into it's own session, and `session-start = session-end = record.ts`.



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -880,25 +888,113 @@ private <T> void shouldHandleWindowKeyQueries(final 
Function<T, Integer> extract
             extractor,
             mkSet()
         );
+
+        shouldHandleWindowKeyQuery(
+                0,
+                Instant.ofEpochMilli(WINDOW_START),
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+                extractor,
+                mkSet(0)
+        );
+
+        shouldHandleWindowKeyQuery(
+                1,
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(4).toMillis()),
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(11).toMillis()),
+                extractor,
+                mkSet(1)
+        );
+
+        shouldHandleWindowKeyQuery(
+                2,
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(12).toMillis()),
+                extractor,
+                mkSet(2)
+        );
+
+        shouldHandleWindowKeyQuery(
+                3,
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(13).toMillis()),
+                Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(20).toMillis()),
+                extractor,
+                mkSet(3)
+        );
     }
 
     private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> 
extractor) {
         final long windowSize = WINDOW_SIZE.toMillis();
         final long windowStart = (RECORD_TIME / windowSize) * windowSize;
 
+        // miss the window start
+        shouldHandleWindowRangeQuery(
+                Instant.ofEpochMilli(windowStart - 1),
+                Instant.ofEpochMilli(windowStart - 1),
+                extractor,
+                mkSet()
+        );
+
+        shouldHandleWindowRangeQuery(
+                Instant.ofEpochMilli(windowStart),
+                Instant.ofEpochMilli(windowStart),
+                extractor,
+                mkSet(0)
+        );
+
         shouldHandleWindowRangeQuery(
             Instant.ofEpochMilli(windowStart),
-            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
             extractor,
-            mkSet(0, 1, 2, 3)
+            mkSet(0, 1)
         );
 
-        // miss the window start
         shouldHandleWindowRangeQuery(

Review Comment:
   Similar to above -- it's not obvious why you picked the time bounds for the 
queries. Can you explain?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -847,11 +847,11 @@ private <T> void shouldHandleWindowKeyQueries(final 
Function<T, Integer> extract
 
         // tightest possible start range
         shouldHandleWindowKeyQuery(
-            2,
+            0,
             Instant.ofEpochMilli(WINDOW_START),
             Instant.ofEpochMilli(WINDOW_START),
             extractor,
-            mkSet(2)
+            mkSet(0)
         );
 
         // miss the window start range

Review Comment:
   The key below should be updated to `0`? Otherwise we don't miss the window 
of the key be 1 ms as intended by the test?



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -880,25 +888,113 @@ private <T> void shouldHandleWindowKeyQueries(final 
Function<T, Integer> extract
             extractor,
             mkSet()
         );
+
+        shouldHandleWindowKeyQuery(

Review Comment:
   For this and the other added queries. How do you pick the query search 
bounds? It's not obvious to me.



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -863,6 +863,14 @@ private <T> void shouldHandleWindowKeyQueries(final 
Function<T, Integer> extract
             mkSet()
         );
 
+        shouldHandleWindowKeyQuery(
+                2,

Review Comment:
   nit: fix indention -- should only be 4 spaces (similar below)
   
   What is the indent of adding this test case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to