mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898411852


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   If I read the code correctly, what `fetchAll()` does is correct: from my 
understanding, that fetchAll() is implement to find "overlapping sessions" 
given a lower and upper bound -- the lower bound must be smaller and session 
end and the upper bound must be smaller than session start to find an overlap. 
Because the upper bound compares to session start,, and we use the "base" we 
need to search the full "data/base part" of the store.
   
   I guess the issue is, that you actually cannot use `fetchAll()` at all for 
our purpose here? Passing in `lastEndTime` does not work (does it) as it would 
be used to compare to session start-times, but we want to do a comparison to 
session end time. -- Thus, I think the right solution is, to actually also add 
the new `findSessions()` to the internal `SegmentedStore` and implement a 
proper iterator there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to