mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898340071


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = 
keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        // since subMap is exclusive on toKey, we need to plus one
+        return registerNewIterator(null,
+                                   null,
+                                    Long.MAX_VALUE,
+                                    endTimeMap.subMap(earliestEndTime, 
latestEndTime + 1).entrySet().iterator(),
+                                    true);

Review Comment:
   Ok. I read the code of `InMemorySessionStore` in detail and now understand 
what's going on. This LGTM.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = 
keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        // since subMap is exclusive on toKey, we need to plus one
+        return registerNewIterator(null,
+                                   null,
+                                    Long.MAX_VALUE,

Review Comment:
   nit: indention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to