guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r880784817
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
);
}
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final
Instant earliestSessionEndTime,
+ final
Instant latestSessionEndTime) {
+ final long earliestEndTime =
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+ prepareMillisCheckFailMsgPrefix(earliestSessionEndTime,
"earliestSessionEndTime"));
+ final long latestEndTime =
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+ prepareMillisCheckFailMsgPrefix(latestSessionEndTime,
"latestSessionEndTime"));
+
+ final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetchAll(earliestEndTime, latestEndTime);
Review Comment:
This is the second open question: with the current prefixed (base, i.e.
time-first) session key schema, this fetchAll would be effectively searching
for `[earliestEnd, INF]` because of this logic:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46
This is because we translate the range query without key inside
`AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange`
instead of `lower/upperRangeFixedSize`):
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242
I cannot remember why we need to do this. @lihaosky @mjsax do you remember
why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]