ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r499996879



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            if (forward) {
+                current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                current = context.cache().reverseRange(cacheName, 
cacheKeyFrom, cacheKeyTo);
+            }
         }
 
         private void setCacheKeyRange(final long lowerRangeEndTime, final long 
upperRangeEndTime) {

Review comment:
       Can you fix the `keyFrom == keyTo` to use `.equals` on the side (down on 
line 370)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            if (forward) {

Review comment:
       I think we're going to need some additional changes in this class 
similar to what we had in CachingWindowStore. Definitely at least in 
`getNextSegmentIterator()`. Let's make sure to have some cross-segment test 
coverage here as well, especially because the iteration logic of session store 
range queries is the hardest to wrap your head around out of all the stores (at 
least, it is for me)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -201,7 +247,26 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
         removeExpiredSegments();
 
-        return registerNewIterator(key, key, Long.MAX_VALUE, 
endTimeMap.entrySet().iterator());
+        return registerNewIterator(
+            key,
+            key,
+            Long.MAX_VALUE, endTimeMap.entrySet().iterator(),

Review comment:
       missing newline

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -382,9 +478,20 @@ private boolean setInnerIterators() {
                 currentKey = nextKeyEntry.getKey();
 
                 if (latestSessionStartTime == Long.MAX_VALUE) {
-                    recordIterator = 
nextKeyEntry.getValue().entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = 
nextKeyEntry.getValue().descendingMap().entrySet();
+                    else entries = nextKeyEntry.getValue().entrySet();
+                    recordIterator = entries.iterator();
                 } else {
-                    recordIterator = 
nextKeyEntry.getValue().headMap(latestSessionStartTime, 
true).entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = nextKeyEntry.getValue()

Review comment:
       If/else needs brackets




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to