jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475910689
##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -119,15 +118,16 @@
* <p>
* This iterator must be closed after use.
*
- * @param from the first key in the range
- * @param to the last key in the range
- * @param timeFrom time range start (inclusive)
- * @param timeTo time range end (inclusive)
+ * @param from the first key in the range
+ * @param to the last key in the range
+ * @param timeFrom time range start (inclusive)
+ * @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>,
value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if one of the given keys is {@code null}
+ * @throws NullPointerException if one of the given keys is {@code
null}
*/
- @SuppressWarnings("deprecation") // note, this method must be kept if
super#fetch(...) is removed
+ // note, this method must be kept if super#fetch(...) is removed
+ @SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long
timeTo);
Review comment:
These methods were introduced when adding Duration/Instant support
https://github.com/apache/kafka/pull/5682.
I don't think these are needed, we can do a similar change as for
SessionStore read operations. wdyt?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -419,13 +504,13 @@ Long minTime() {
}
Review comment:
For windowStore, only time-based index is been iterated backward. The
KIP didn't considered reversing key/value stores internally.
We would need another flag (apart from backward) to define order of internal
keys, which its cumbersome, and the order between keys doesn't matter much or
can be calculated by the user.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
setCacheKeyRange(currentSegmentBeginTime(),
currentSegmentLastTime());
Review comment:
Will have to double check this. I have inverted the current/last segment
for backwards use-case though.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -72,22 +86,40 @@
searchSpace.iterator(),
Review comment:
`searchSpace` will be reversed based on the `forward` flag, on
`AbstractSegments`.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final
long windowStartTimes
@Deprecated
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long
timeFrom, final long timeTo) {
+ return fetch(key, timeFrom, timeTo, true);
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final
Instant from, final Instant to) {
+ final long timeFrom = ApiUtils.validateMillisecondInstant(from,
prepareMillisCheckFailMsgPrefix(from, "from"));
+ final long timeTo = ApiUtils.validateMillisecondInstant(to,
prepareMillisCheckFailMsgPrefix(to, "to"));
Review comment:
Only backward compatibility. If it make sense to remove these
deprecations as part of this KIP, I'd be happy to help cleaning it.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -337,25 +462,32 @@ public synchronized void close() {
private CacheIteratorWrapper(final Bytes key,
final long timeFrom,
- final long timeTo) {
- this(key, key, timeFrom, timeTo);
+ final long timeTo,
+ final boolean forward) {
+ this(key, key, timeFrom, timeTo, forward);
}
private CacheIteratorWrapper(final Bytes keyFrom,
final Bytes keyTo,
final long timeFrom,
- final long timeTo) {
+ final long timeTo,
+ final boolean forward) {
this.keyFrom = keyFrom;
this.keyTo = keyTo;
this.timeTo = timeTo;
this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo,
maxObservedTimestamp.get()));
+ this.forward = forward;
this.segmentInterval = cacheFunction.getSegmentInterval();
this.currentSegmentId = cacheFunction.segmentId(timeFrom);
Review comment:
great catch! I think this hasn't pop up yet in the tests as all tests
may be using the same segment.
Will double check to add more tests to validate this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]