ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500664927



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -278,6 +326,23 @@ public void shouldFetchCorrectlyAcrossSegments() {
         assertFalse(results.hasNext());
     }
 
+    @Test
+    public void shouldBackwardFetchCorrectlyAcrossSegments() {
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, "1".getBytes());
+        cachingStore.put(a2, "2".getBytes());
+        cachingStore.flush();
+        cachingStore.put(a3, "3".getBytes());

Review comment:
       Can we add a few more records that span multiple segments that don't get 
flushed as well?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -301,6 +366,29 @@ public void shouldFetchRangeCorrectlyAcrossSegments() {
         assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
     }
 
+    @Test
+    public void shouldBackwardFetchRangeCorrectlyAcrossSegments() {
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, "1".getBytes());
+        cachingStore.put(aa1, "1".getBytes());
+        cachingStore.put(a2, "2".getBytes());
+        cachingStore.put(a3, "3".getBytes());
+        cachingStore.put(aa3, "3".getBytes());
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
+            cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL 
* 2);
+        final Set<Windowed<Bytes>> keys = new HashSet<>();
+        while (rangeResults.hasNext()) {
+            keys.add(rangeResults.next().key);
+        }
+        rangeResults.close();
+        assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);

Review comment:
       We're losing the ordering check by comparing this as a set, let's use a 
list (or whatever) to verify the actual order

##########
File path: 
streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
##########
@@ -82,11 +118,15 @@ public boolean hasNext() {
                 public KeyValue<Windowed<K>, V> next() {
                     return it.next();
                 }
-
             }
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to) {
+        return null;

Review comment:
       I guess it probably doesn't matter since we presumably aren't using 
these backward methods of the ReadOnlySessionStoreStub, but it seems like it 
might result in some tricky NPEs to debug if ever someone does try to use it in 
a test. If you don't feel like implementing it I think it's fine to just throw 
UnsupportedOperationException and say that you'll have to implement this to use 
it.
   
   Or just copy the code from the forward direction and flip it 🤷‍♀️  Same goes 
for all the methods in here that return null

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -456,68 +562,88 @@ public void shouldClearNamespaceCacheOnClose() {
         assertEquals(0, cache.size());
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.fetch(keyA);
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.fetch(keyA));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.findSessions(keyA, 0, Long.MAX_VALUE);
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.findSessions(keyA, 0, Long.MAX_VALUE));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)));
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToPutIntoClosedCachingStore() {
         cachingStore.close();
-        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes()));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
-        cachingStore.findSessions(null, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.findSessions(null, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() {
-        cachingStore.findSessions(null, keyA, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.findSessions(null, keyA, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() {
-        cachingStore.findSessions(keyA, null, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.findSessions(keyA, null, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullFromKey() {
-        cachingStore.fetch(null, keyA);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.fetch(null, keyA));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullToKey() {
-        cachingStore.fetch(keyA, null);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.fetch(keyA, null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullKey() {
-        cachingStore.fetch(null);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.fetch(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnRemoveNullKey() {
-        cachingStore.remove(null);
+        assertThrows(NullPointerException.class, () -> 
cachingStore.remove(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        cachingStore.put(null, "1".getBytes());
+        assertThrows(NullPointerException.class, () -> cachingStore.put(null, 
"1".getBytes()));
+    }
+
+    @Test
+    public void 
shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {

Review comment:
       Technically "InvalidRangeException" was just the name of the exception 
that could get thrown, there is no "InvalidBackwardRangeException" that I know 
of 😛  But I think the meaning is clear enough lol




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to