This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d0e6943 KAFKA-9929: Support backward iterator on SessionStore (#9139) d0e6943 is described below commit d0e6943bdd048aa6e0a4dbbdad3c8da460db16dc Author: Jorge Esteban Quilcate Otoya <quilcate.jo...@gmail.com> AuthorDate: Thu Oct 8 14:08:24 2020 +0100 KAFKA-9929: Support backward iterator on SessionStore (#9139) Implements KIP-617 for `SessionStore` Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, John Roesler <vvcep...@apache.org> --- .../kafka/streams/state/ReadOnlySessionStore.java | 130 +++++++- .../apache/kafka/streams/state/SessionStore.java | 40 --- .../state/internals/CachingSessionStore.java | 148 +++++++-- .../internals/ChangeLoggingSessionBytesStore.java | 24 ++ .../internals/CompositeReadOnlySessionStore.java | 169 +++++++++++ .../state/internals/InMemorySessionStore.java | 123 +++++++- .../MergedSortedCacheSessionStoreIterator.java | 5 +- .../state/internals/MeteredSessionStore.java | 68 +++++ .../state/internals/RocksDBSessionStore.java | 36 +++ .../internals/AbstractSessionBytesStoreTest.java | 187 ++++++++++++ .../state/internals/CacheFlushListenerStub.java | 49 +++ ....java => CachingInMemoryKeyValueStoreTest.java} | 29 +- ...t.java => CachingInMemorySessionStoreTest.java} | 212 ++++++++++--- ...java => CachingPersistentSessionStoreTest.java} | 330 +++++++++++++++------ ....java => CachingPersistentWindowStoreTest.java} | 6 +- .../ChangeLoggingSessionBytesStoreTest.java | 40 +++ ...SortedCacheWrappedSessionStoreIteratorTest.java | 63 +++- .../state/internals/MeteredSessionStoreTest.java | 80 +++++ .../kafka/test/ReadOnlySessionStoreStub.java | 72 ++++- 19 files changed, 1571 insertions(+), 240 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 230d257..8874908 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -28,31 +28,153 @@ import org.apache.kafka.streams.kstream.Windowed; * @param <AGG> the aggregated value type */ public interface ReadOnlySessionStore<K, AGG> { + /** - * Retrieve all aggregated sessions for the provided key. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from earliest to latest. + * <p> + * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from latest to earliest. + * <p> + * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from earliest to latest. + * <p> + * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from latest to earliest. + * <p> * This iterator must be closed after use. * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param startTime start timestamp of the session + * @param endTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + default AGG fetchSession(final K key, final long startTime, final long endTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the provided key. + * This iterator must be closed after use. + * <p> * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * * @param key record key to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. * @throws NullPointerException If null is used for key. * */ KeyValueIterator<Windowed<K>, AGG> fetch(final K key); /** - * Retrieve all aggregated sessions for the given range of keys. + * Retrieve all aggregated sessions for the provided key. * This iterator must be closed after use. + * <p> + * For each key, the iterator guarantees ordering of sessions, starting from the newest/latest + * available session to the oldest/earliest session. * + * @param key record key to find aggregated session values for + * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the given range of keys. + * This iterator must be closed after use. + * <p> * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * * @param from first key in the range to find aggregated session values for * @param to last key in the range to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. * @throws NullPointerException If null is used for any of the keys. */ KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to); + + /** + * Retrieve all aggregated sessions for the given range of keys. + * This iterator must be closed after use. + * <p> + * For each key, the iterator guarantees ordering of sessions, starting from the newest/latest + * available session to the oldest/earliest session. + * + * @param from first key in the range to find aggregated session values for + * @param to last key in the range to find aggregated session values for + * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @throws NullPointerException If null is used for any of the keys. + */ + default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index faaa751..47f48d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -35,46 +35,6 @@ import org.apache.kafka.streams.processor.StateStore; public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> { /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime - * - * This iterator must be closed after use. - * - * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for - * @param latestSessionStartTime the end timestamp of the latest session to search for - * @return iterator of sessions with the matching key and aggregated values - * @throws NullPointerException If null is used for key. - */ - KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); - - /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime - * - * This iterator must be closed after use. - * - * @param keyFrom The first key that could be in the range - * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for - * @param latestSessionStartTime the end timestamp of the latest session to search for - * @return iterator of sessions with the matching keys and aggregated values - * @throws NullPointerException If null is used for any key. - */ - KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); - - /** - * Get the value of key from a single session. - * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session - * @return The value or {@code null} if no session associated with the key can be found - * @throws NullPointerException If {@code null} is used for any key. - */ - AGG fetchSession(final K key, final long startTime, final long endTime); - - /** * Remove the session aggregated with provided {@link Windowed} key from the store * @param sessionKey key of the session to remove * @throws NullPointerException If null is used for sessionKey. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c92123d..d0fe25a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -76,7 +76,6 @@ class CachingSessionStore super.init(context, root); } - @SuppressWarnings("unchecked") private void initInternal(final InternalProcessorContext context) { this.context = context; @@ -159,7 +158,7 @@ class CachingSessionStore validateStoreOpen(); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? - new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) : + new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, true) : context.cache().range(cacheName, cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)) @@ -174,7 +173,38 @@ class CachingSessionStore latestSessionStartTime); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, true); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + validateStoreOpen(); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? + new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, false) : + context.cache().reverseRange( + cacheName, + cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), + cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime) + ) + ); + + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = wrapped().backwardFindSessions( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition( + key, + key, + earliestSessionEndTime, + latestSessionStartTime + ); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, false); } @Override @@ -205,7 +235,39 @@ class CachingSessionStore latestSessionStartTime); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, true); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + validateStoreOpen(); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); + + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = + wrapped().backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, false); } @Override @@ -233,6 +295,12 @@ class CachingSessionStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + Objects.requireNonNull(key, "key cannot be null"); + return backwardFindSessions(key, 0, Long.MAX_VALUE); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) { Objects.requireNonNull(from, "from cannot be null"); @@ -240,6 +308,14 @@ class CachingSessionStore return findSessions(from, to, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, + final Bytes to) { + Objects.requireNonNull(from, "from cannot be null"); + Objects.requireNonNull(to, "to cannot be null"); + return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + } + public void flush() { context.cache().flush(cacheName); wrapped().flush(); @@ -269,6 +345,8 @@ class CachingSessionStore private final Bytes keyFrom; private final Bytes keyTo; private final long latestSessionStartTime; + private final boolean forward; + private long lastSegmentId; private long currentSegmentId; @@ -279,25 +357,36 @@ class CachingSessionStore private CacheIteratorWrapper(final Bytes key, final long earliestSessionEndTime, - final long latestSessionStartTime) { - this(key, key, earliestSessionEndTime, latestSessionStartTime); + final long latestSessionStartTime, + final boolean forward) { + this(key, key, earliestSessionEndTime, latestSessionStartTime, forward); } private CacheIteratorWrapper(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long latestSessionStartTime, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.latestSessionStartTime = latestSessionStartTime; - this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); this.segmentInterval = cacheFunction.getSegmentInterval(); + this.forward = forward; + - this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime); + if (forward) { + this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime); + this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); - setCacheKeyRange(earliestSessionEndTime, currentSegmentLastTime()); + setCacheKeyRange(earliestSessionEndTime, currentSegmentLastTime()); + this.current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + } else { + this.lastSegmentId = cacheFunction.segmentId(earliestSessionEndTime); + this.currentSegmentId = cacheFunction.segmentId(maxObservedTimestamp); - this.current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + setCacheKeyRange(currentSegmentBeginTime(), Math.min(latestSessionStartTime, maxObservedTimestamp)); + this.current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); + } } @Override @@ -357,18 +446,35 @@ class CachingSessionStore } private void getNextSegmentIterator() { - ++currentSegmentId; - lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); + if (forward) { + ++currentSegmentId; + lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); - if (currentSegmentId > lastSegmentId) { - current = null; - return; - } + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } - setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + + current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + } else { + --currentSegmentId; + + if (currentSegmentId < lastSegmentId) { + current = null; + return; + } + + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + + current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); + } - current.close(); - current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); } private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { @@ -376,7 +482,7 @@ class CachingSessionStore throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } - if (keyFrom == keyTo) { + if (keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 0d2133d..556a67e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -60,11 +60,25 @@ class ChangeLoggingSessionBytesStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + + @Override public void remove(final Windowed<Bytes> sessionKey) { wrapped().remove(sessionKey); context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp()); @@ -82,11 +96,21 @@ class ChangeLoggingSessionBytesStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return wrapped().backwardFetch(key); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { return wrapped().fetch(key); } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, final Bytes to) { + return wrapped().backwardFetch(from, to); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) { return wrapped().fetch(from, to); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 63d551c..7223312 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -43,6 +43,137 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore } @Override + public KeyValueIterator<Windowed<K>, V> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + final KeyValueIterator<Windowed<K>, V> result = + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + final KeyValueIterator<Windowed<K>, V> result = store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + final KeyValueIterator<Windowed<K>, V> result = + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + final KeyValueIterator<Windowed<K>, V> result = + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public V fetchSession(final K key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + return store.fetchSession(key, startTime, endTime); + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return null; + } + + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K key) { Objects.requireNonNull(key, "key can't be null"); final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); @@ -65,6 +196,30 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore } @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) { + Objects.requireNonNull(key, "key can't be null"); + final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore<K, V> store : stores) { + try { + final KeyValueIterator<Windowed<K>, V> result = store.backwardFetch(key); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) { Objects.requireNonNull(from, "from can't be null"); Objects.requireNonNull(to, "to can't be null"); @@ -74,4 +229,18 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore storeProvider.stores(storeName, queryableStoreType).iterator(), nextIteratorFunction)); } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from, final K to) { + Objects.requireNonNull(from, "from can't be null"); + Objects.requireNonNull(to, "to can't be null"); + final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = store -> store.backwardFetch(from, to); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>( + storeProvider.stores(storeName, queryableStoreType).iterator(), + nextIteratorFunction + ) + ); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 2e45b48..46c4de2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -168,7 +168,25 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { return registerNewIterator(key, key, latestSessionStartTime, - endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(), + true); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator( + key, + key, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).descendingMap().entrySet().iterator(), + false + ); } @Override @@ -192,7 +210,35 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { return registerNewIterator(keyFrom, keyTo, latestSessionStartTime, - endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(), + true); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + + removeExpiredSegments(); + + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + return registerNewIterator( + keyFrom, + keyTo, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).descendingMap().entrySet().iterator(), + false + ); } @Override @@ -202,7 +248,17 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { removeExpiredSegments(); - return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + + Objects.requireNonNull(key, "key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false); } @Override @@ -214,7 +270,17 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, final Bytes to) { + Objects.requireNonNull(from, "from key cannot be null"); + Objects.requireNonNull(to, "to key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); } @Override @@ -259,8 +325,17 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { private InMemorySessionStoreIterator registerNewIterator(final Bytes keyFrom, final Bytes keyTo, final long latestSessionStartTime, - final Iterator<Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator) { - final InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, openIterators::remove); + final Iterator<Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator, + final boolean forward) { + final InMemorySessionStoreIterator iterator = + new InMemorySessionStoreIterator( + keyFrom, + keyTo, + latestSessionStartTime, + endTimeIterator, + openIterators::remove, + forward + ); openIterators.add(iterator); return iterator; } @@ -285,17 +360,21 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { private final ClosingCallback callback; + private final boolean forward; + InMemorySessionStoreIterator(final Bytes keyFrom, final Bytes keyTo, final long latestSessionStartTime, final Iterator<Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator, - final ClosingCallback callback) { + final ClosingCallback callback, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.latestSessionStartTime = latestSessionStartTime; this.endTimeIterator = endTimeIterator; this.callback = callback; + this.forward = forward; setAllIterators(); } @@ -366,7 +445,18 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { while (endTimeIterator.hasNext()) { final Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> nextEndTimeEntry = endTimeIterator.next(); currentEndTime = nextEndTimeEntry.getKey(); - keyIterator = nextEndTimeEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); + if (forward) { + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .entrySet() + .iterator(); + } else { + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .descendingMap() + .entrySet() + .iterator(); + } if (setInnerIterators()) { return; @@ -383,9 +473,22 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { currentKey = nextKeyEntry.getKey(); if (latestSessionStartTime == Long.MAX_VALUE) { - recordIterator = nextKeyEntry.getValue().entrySet().iterator(); + if (forward) { + recordIterator = nextKeyEntry.getValue().descendingMap().entrySet().iterator(); + } else { + recordIterator = nextKeyEntry.getValue().entrySet().iterator(); + } } else { - recordIterator = nextKeyEntry.getValue().headMap(latestSessionStartTime, true).entrySet().iterator(); + if (forward) { + recordIterator = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .descendingMap() + .entrySet().iterator(); + } else { + recordIterator = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .entrySet().iterator(); + } } if (recordIterator.hasNext()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index ff45a41..cd0c0df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -32,8 +32,9 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator, - final SegmentedCacheFunction cacheFunction) { - super(cacheIterator, storeIterator, true); + final SegmentedCacheFunction cacheFunction, + final boolean forward) { + super(cacheIterator, storeIterator, forward); this.cacheFunction = cacheFunction; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 8b9256d..f7f25c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -222,6 +222,18 @@ public class MeteredSessionStore<K, V> } @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFetch(keyBytes(key)), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) { Objects.requireNonNull(from, "from cannot be null"); @@ -235,6 +247,20 @@ public class MeteredSessionStore<K, V> } @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from, + final K to) { + Objects.requireNonNull(from, "from cannot be null"); + Objects.requireNonNull(to, "to cannot be null"); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFetch(keyBytes(from), keyBytes(to)), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + + @Override public KeyValueIterator<Windowed<K>, V> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { @@ -252,6 +278,25 @@ public class MeteredSessionStore<K, V> } @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + final Bytes bytesKey = keyBytes(key); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFindSessions( + bytesKey, + earliestSessionEndTime, + latestSessionStartTime + ), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + + @Override public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, @@ -273,6 +318,29 @@ public class MeteredSessionStore<K, V> } @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + final Bytes bytesKeyFrom = keyBytes(keyFrom); + final Bytes bytesKeyTo = keyBytes(keyTo); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFindSessions( + bytesKeyFrom, + bytesKeyTo, + earliestSessionEndTime, + latestSessionStartTime + ), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + + @Override public void flush() { maybeMeasureLatency(super::flush, time, flushSensor); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 2f7a211..338769a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -43,6 +43,18 @@ public class RocksDBSessionStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, @@ -57,6 +69,20 @@ public class RocksDBSessionStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator); + } + + @Override public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); } @@ -67,11 +93,21 @@ public class RocksDBSessionStore } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return backwardFindSessions(key, 0, Long.MAX_VALUE); + } + + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) { return findSessions(from, to, 0, Long.MAX_VALUE); } @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, final Bytes to) { + return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + } + + @Override public void remove(final Windowed<Bytes> key) { wrapped().remove(SessionKeySchema.toBinary(key)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index bb425a9..b355f0e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -135,6 +135,31 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldPutAndBackwardFindSessionsInRange() { + final String key = "a"; + final Windowed<String> a1 = new Windowed<>(key, new SessionWindow(10, 10L)); + final Windowed<String> a2 = new Windowed<>(key, new SessionWindow(500L, 1000L)); + sessionStore.put(a1, 1L); + sessionStore.put(a2, 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); + + final List<KeyValue<Windowed<String>, Long>> expected = + asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + final List<KeyValue<Windowed<String>, Long>> expected2 = + Collections.singletonList(KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator<Windowed<String>, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) { + assertEquals(new HashSet<>(expected2), toSet(values2)); + } + } + + @Test public void shouldFetchAllSessionsWithSameRecordKey() { final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), @@ -155,6 +180,27 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L) + ); + + for (final KeyValue<Windowed<String>, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); + + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("a")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + } + + @Test public void shouldFetchAllSessionsWithinKeyRange() { final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), @@ -177,6 +223,29 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFetchAllSessionsWithinKeyRange() { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), + + KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L) + ); + + for (final KeyValue<Windowed<String>, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add some that shouldn't appear in the results + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); + + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("aa", "bb")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + } + + @Test public void shouldFetchExactSession() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 4)), 1L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 3)), 2L); @@ -209,6 +278,22 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFindValuesWithinMergingSessionWindowRange() { + final String key = "a"; + sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L) + ); + + try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) { + assertEquals(new HashSet<>(expected), toSet(results)); + } + } + + @Test public void shouldRemove() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); @@ -262,6 +347,27 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFindSessionsToMerge() { + final Windowed<String> session1 = new Windowed<>("a", new SessionWindow(0, 100)); + final Windowed<String> session2 = new Windowed<>("a", new SessionWindow(101, 200)); + final Windowed<String> session3 = new Windowed<>("a", new SessionWindow(201, 300)); + final Windowed<String> session4 = new Windowed<>("a", new SessionWindow(301, 400)); + final Windowed<String> session5 = new Windowed<>("a", new SessionWindow(401, 500)); + sessionStore.put(session1, 1L); + sessionStore.put(session2, 2L); + sessionStore.put(session3, 3L); + sessionStore.put(session4, 4L); + sessionStore.put(session5, 5L); + + final List<KeyValue<Windowed<String>, Long>> expected = + asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); + + try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) { + assertEquals(new HashSet<>(expected), toSet(results)); + } + } + + @Test public void shouldFetchExactKeys() { sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long()); sessionStore.init((StateStoreContext) context, sessionStore); @@ -299,6 +405,43 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFetchExactKeys() { + sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long()); + sessionStore.init((StateStoreContext) context, sessionStore); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + sessionStore.put(new Windowed<>("a", + new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); + + try (final KeyValueIterator<Windowed<String>, Long> iterator = + sessionStore.backwardFindSessions("a", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 3L, 5L)))); + } + + try (final KeyValueIterator<Windowed<String>, Long> iterator = + sessionStore.backwardFindSessions("aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L, 4L)))); + } + + try (final KeyValueIterator<Windowed<String>, Long> iterator = + sessionStore.backwardFindSessions("a", "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator<Windowed<String>, Long> iterator = + sessionStore.backwardFindSessions("a", "aa", 10, 0) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); + } + } + + @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final SessionStore<Bytes, String> sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String()); @@ -328,6 +471,35 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void shouldBackwardFetchAndIterateOverExactBinaryKeys() { + final SessionStore<Bytes, String> sessionStore = + buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String()); + + sessionStore.init((StateStoreContext) context, sessionStore); + + final Bytes key1 = Bytes.wrap(new byte[] {0}); + final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); + final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0}); + + sessionStore.put(new Windowed<>(key1, new SessionWindow(1, 100)), "1"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(2, 100)), "2"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(3, 100)), "3"); + sessionStore.put(new Windowed<>(key1, new SessionWindow(4, 100)), "4"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(5, 100)), "5"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(6, 100)), "6"); + sessionStore.put(new Windowed<>(key1, new SessionWindow(7, 100)), "7"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); + + final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); + final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); + final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + } + + @Test public void testIteratorPeek() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); @@ -343,6 +515,21 @@ public abstract class AbstractSessionBytesStoreTest { } @Test + public void testIteratorPeekBackward() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + + final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.backwardFindSessions("a", 0L, 20); + + assertEquals(iterator.peekNextKey(), new Windowed<>("a", new SessionWindow(10L, 20L))); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertFalse(iterator.hasNext()); + } + + @Test public void shouldRestore() { final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java new file mode 100644 index 0000000..ea4b147 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.kstream.internals.Change; + +import java.util.HashMap; +import java.util.Map; + +public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> { + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> valueDeserializer; + final Map<K, Change<V>> forwarded = new HashMap<>(); + + CacheFlushListenerStub(final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer) { + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + } + + @Override + public void apply(final byte[] key, + final byte[] newValue, + final byte[] oldValue, + final long timestamp) { + forwarded.put( + keyDeserializer.deserialize(null, key), + new Change<>( + valueDeserializer.deserialize(null, newValue), + valueDeserializer.deserialize(null, oldValue) + ) + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java similarity index 94% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index 89e2b0e..a9085a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -17,14 +17,12 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -43,9 +41,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.hamcrest.CoreMatchers.equalTo; @@ -57,7 +53,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { +public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { private final static String TOPIC = "topic"; private static final String CACHE_NAMESPACE = "0_0-store-name"; @@ -527,27 +523,4 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { return i; } - public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> { - final Deserializer<K> keyDeserializer; - final Deserializer<V> valueDeserializer; - final Map<K, Change<V>> forwarded = new HashMap<>(); - - CacheFlushListenerStub(final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer) { - this.keyDeserializer = keyDeserializer; - this.valueDeserializer = valueDeserializer; - } - - @Override - public void apply(final byte[] key, - final byte[] newValue, - final byte[] oldValue, - final long timestamp) { - forwarded.put( - keyDeserializer.deserialize(null, key), - new Change<>( - valueDeserializer.deserialize(null, newValue), - valueDeserializer.deserialize(null, oldValue))); - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java similarity index 71% copy from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java copy to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 05e97a2..e584e2c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -46,14 +46,11 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.Set; import static java.util.Arrays.asList; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; @@ -68,7 +65,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @SuppressWarnings("PointlessArithmeticExpression") -public class CachingSessionStoreTest { +public class CachingInMemorySessionStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 600; private static final Long DEFAULT_TIMESTAMP = 10L; @@ -80,14 +77,14 @@ public class CachingSessionStoreTest { private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); - private SessionStore<Bytes, byte[]> underlyingStore = - new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); + private SessionStore<Bytes, byte[]> underlyingStore; private InternalMockProcessorContext context; private CachingSessionStore cachingStore; private ThreadCache cache; @Before public void before() { + underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); @@ -158,6 +155,21 @@ public class CachingSessionStoreTest { } @Test + public void shouldPutBackwardFetchAllKeysFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> all = cachingStore.backwardFindSessions(keyA, keyB, 0, 0); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + @Test public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() { setUpCloseTests(); EasyMock.reset(cache); @@ -211,7 +223,7 @@ public class CachingSessionStoreTest { EasyMock.replay(underlyingStore); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = EasyMock.niceMock(ThreadCache.class); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -231,6 +243,20 @@ public class CachingSessionStoreTest { } @Test + public void shouldPutBackwardFetchRangeFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> some = cachingStore.backwardFindSessions(keyAA, keyB, 0, 0); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + @Test public void shouldFetchAllSessionsWithSameRecordKey() { final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList( KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), @@ -250,6 +276,26 @@ public class CachingSessionStoreTest { } @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes()); + + final List<KeyValue<Windowed<Bytes>, byte[]>> results = toList(cachingStore.backwardFetch(keyA)); + Collections.reverse(results); + verifyKeyValueList(expected, results); + } + + @Test public void shouldFlushItemsToStoreOnEviction() { final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); assertEquals(added.size() - 1, cache.size()); @@ -292,15 +338,50 @@ public class CachingSessionStoreTest { final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed<Bytes> a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed<Bytes> a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); final KeyValueIterator<Windowed<Bytes>, byte[]> results = - cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2); + cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 5); assertEquals(a1, results.next().key); assertEquals(a2, results.next().key); assertEquals(a3, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a6, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldBackwardFetchCorrectlyAcrossSegments() { + final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed<Bytes> a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed<Bytes> a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); + final KeyValueIterator<Windowed<Bytes>, byte[]> results = + cachingStore.backwardFindSessions(keyA, 0, SEGMENT_INTERVAL * 5); + assertEquals(a6, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a3, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a1, results.next().key); assertFalse(results.hasNext()); } @@ -319,12 +400,35 @@ public class CachingSessionStoreTest { final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); - final Set<Windowed<Bytes>> keys = new HashSet<>(); + final List<Windowed<Bytes>> keys = new ArrayList<>(); while (rangeResults.hasNext()) { keys.add(rangeResults.next().key); } rangeResults.close(); - assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); + assertEquals(asList(a1, aa1, a2, a3, aa3), keys); + } + + @Test + public void shouldBackwardFetchRangeCorrectlyAcrossSegments() { + final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(aa1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.put(aa3, "3".getBytes()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = + cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final List<Windowed<Bytes>> keys = new ArrayList<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(asList(aa3, a3, a2, aa1, a1), keys); } @Test @@ -474,6 +578,24 @@ public class CachingSessionStoreTest { } @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = + cachingStore.backwardFindSessions(keyAA, 0L, 10L); + final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = + cachingStore.backwardFindSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test public void shouldClearNamespaceCacheOnClose() { final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); cachingStore.put(a1, "1".getBytes()); @@ -482,68 +604,90 @@ public class CachingSessionStoreTest { assertEquals(0, cache.size()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFetchFromClosedCachingStore() { cachingStore.close(); - cachingStore.fetch(keyA); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() { cachingStore.close(); - cachingStore.findSessions(keyA, 0, Long.MAX_VALUE); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToRemoveFromClosedCachingStore() { cachingStore.close(); - cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)))); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToPutIntoClosedCachingStore() { cachingStore.close(); - cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes())); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { - cachingStore.findSessions(null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - cachingStore.findSessions(null, keyA, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - cachingStore.findSessions(keyA, null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - cachingStore.fetch(null, keyA); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - cachingStore.fetch(keyA, null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { - cachingStore.fetch(null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnRemoveNullKey() { - cachingStore.remove(null); + assertThrows(NullPointerException.class, () -> cachingStore.remove(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnPutNullKey() { - cachingStore.put(null, "1".getBytes()); + assertThrows(NullPointerException.class, () -> cachingStore.put(null, "1".getBytes())); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class)) { + final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.backwardFindSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List<String> messages = appender.getMessages(); + assertThat( + messages, + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) + ); + } } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java similarity index 63% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 05e97a2..d472c7f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -46,14 +45,11 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.Set; import static java.util.Arrays.asList; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; @@ -67,8 +63,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -@SuppressWarnings("PointlessArithmeticExpression") -public class CachingSessionStoreTest { +public class CachingPersistentSessionStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 600; private static final Long DEFAULT_TIMESTAMP = 10L; @@ -80,17 +75,24 @@ public class CachingSessionStoreTest { private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); - private SessionStore<Bytes, byte[]> underlyingStore = - new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); - private InternalMockProcessorContext context; + private SessionStore<Bytes, byte[]> underlyingStore; private CachingSessionStore cachingStore; private ThreadCache cache; @Before public void before() { + final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore( + "store-name", + "metric-scope", + Long.MAX_VALUE, + SEGMENT_INTERVAL, + new SessionKeySchema() + ); + underlyingStore = new RocksDBSessionStore(segmented); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = + new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -100,31 +102,6 @@ public class CachingSessionStoreTest { cachingStore.close(); } - @SuppressWarnings("deprecation") - @Test - public void shouldDelegateDeprecatedInit() { - final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class); - final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL); - EasyMock.expect(inner.name()).andStubReturn("store"); - inner.init((ProcessorContext) context, outer); - EasyMock.expectLastCall(); - EasyMock.replay(inner); - outer.init((ProcessorContext) context, outer); - EasyMock.verify(inner); - } - - @Test - public void shouldDelegateInit() { - final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class); - final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL); - EasyMock.expect(inner.name()).andStubReturn("store"); - inner.init((StateStoreContext) context, outer); - EasyMock.expectLastCall(); - EasyMock.replay(inner); - outer.init((StateStoreContext) context, outer); - EasyMock.verify(inner); - } - @Test public void shouldPutFetchFromCache() { cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); @@ -133,8 +110,10 @@ public class CachingSessionStoreTest { assertEquals(3, cache.size()); - final KeyValueIterator<Windowed<Bytes>, byte[]> a = cachingStore.findSessions(keyA, 0, 0); - final KeyValueIterator<Windowed<Bytes>, byte[]> b = cachingStore.findSessions(keyB, 0, 0); + final KeyValueIterator<Windowed<Bytes>, byte[]> a = + cachingStore.findSessions(keyA, 0, 0); + final KeyValueIterator<Windowed<Bytes>, byte[]> b = + cachingStore.findSessions(keyB, 0, 0); verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(b.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -150,7 +129,8 @@ public class CachingSessionStoreTest { assertEquals(3, cache.size()); - final KeyValueIterator<Windowed<Bytes>, byte[]> all = cachingStore.findSessions(keyA, keyB, 0, 0); + final KeyValueIterator<Windowed<Bytes>, byte[]> all = + cachingStore.findSessions(keyA, keyB, 0, 0); verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -158,6 +138,22 @@ public class CachingSessionStoreTest { } @Test + public void shouldPutBackwardFetchAllKeysFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> all = + cachingStore.backwardFindSessions(keyA, keyB, 0, 0); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + @Test public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() { setUpCloseTests(); EasyMock.reset(cache); @@ -211,7 +207,8 @@ public class CachingSessionStoreTest { EasyMock.replay(underlyingStore); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = EasyMock.niceMock(ThreadCache.class); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = + new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -224,13 +221,29 @@ public class CachingSessionStoreTest { assertEquals(3, cache.size()); - final KeyValueIterator<Windowed<Bytes>, byte[]> some = cachingStore.findSessions(keyAA, keyB, 0, 0); + final KeyValueIterator<Windowed<Bytes>, byte[]> some = + cachingStore.findSessions(keyAA, keyB, 0, 0); verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } @Test + public void shouldPutBackwardFetchRangeFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> some = + cachingStore.backwardFindSessions(keyAA, keyB, 0, 0); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + @Test public void shouldFetchAllSessionsWithSameRecordKey() { final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList( KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), @@ -250,10 +263,31 @@ public class CachingSessionStoreTest { } @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes()); + + final List<KeyValue<Windowed<Bytes>, byte[]>> results = toList(cachingStore.backwardFetch(keyA)); + Collections.reverse(results); + verifyKeyValueList(expected, results); + } + + @Test public void shouldFlushItemsToStoreOnEviction() { final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); assertEquals(added.size() - 1, cache.size()); - final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(added.get(0).key.key(), 0, 0); + final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.findSessions(added.get(0).key.key(), 0, 0); final KeyValue<Windowed<Bytes>, byte[]> next = iterator.next(); assertEquals(added.get(0).key, next.key); assertArrayEquals(added.get(0).value, next.value); @@ -265,7 +299,8 @@ public class CachingSessionStoreTest { final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions( Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)), 0, - added.size() * 10); + added.size() * 10L + ); final List<KeyValue<Windowed<Bytes>, byte[]>> actual = toList(iterator); verifyKeyValueList(added, actual); } @@ -292,15 +327,50 @@ public class CachingSessionStoreTest { final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed<Bytes> a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed<Bytes> a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); final KeyValueIterator<Windowed<Bytes>, byte[]> results = - cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2); + cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 5); assertEquals(a1, results.next().key); assertEquals(a2, results.next().key); assertEquals(a3, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a6, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldBackwardFetchCorrectlyAcrossSegments() { + final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed<Bytes> a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed<Bytes> a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); + final KeyValueIterator<Windowed<Bytes>, byte[]> results = + cachingStore.backwardFindSessions(keyA, 0, SEGMENT_INTERVAL * 5); + assertEquals(a6, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a3, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a1, results.next().key); assertFalse(results.hasNext()); } @@ -319,12 +389,35 @@ public class CachingSessionStoreTest { final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); - final Set<Windowed<Bytes>> keys = new HashSet<>(); + final List<Windowed<Bytes>> keys = new ArrayList<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(asList(a1, aa1, a2, a3, aa3), keys); + } + + @Test + public void shouldBackwardFetchRangeCorrectlyAcrossSegments() { + final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(aa1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.put(aa3, "3".getBytes()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = + cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final List<Windowed<Bytes>> keys = new ArrayList<>(); while (rangeResults.hasNext()) { keys.add(rangeResults.next().key); } rangeResults.close(); - assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); + assertEquals(asList(aa3, a3, a2, aa1, a1), keys); } @Test @@ -342,7 +435,8 @@ public class CachingSessionStoreTest { final CacheFlushListenerStub<Windowed<String>, String> flushListener = new CacheFlushListenerStub<>( new SessionWindowedDeserializer<>(new StringDeserializer()), - new StringDeserializer()); + new StringDeserializer() + ); cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); @@ -353,7 +447,9 @@ public class CachingSessionStoreTest { new KeyValueTimestamp<>( bDeserialized, new Change<>("1", null), - DEFAULT_TIMESTAMP)), + DEFAULT_TIMESTAMP + ) + ), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -366,7 +462,9 @@ public class CachingSessionStoreTest { new KeyValueTimestamp<>( aDeserialized, new Change<>("1", null), - DEFAULT_TIMESTAMP)), + DEFAULT_TIMESTAMP + ) + ), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -379,7 +477,9 @@ public class CachingSessionStoreTest { new KeyValueTimestamp<>( aDeserialized, new Change<>("2", "1"), - DEFAULT_TIMESTAMP)), + DEFAULT_TIMESTAMP + ) + ), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -392,7 +492,9 @@ public class CachingSessionStoreTest { new KeyValueTimestamp<>( aDeserialized, new Change<>(null, "2"), - DEFAULT_TIMESTAMP)), + DEFAULT_TIMESTAMP + ) + ), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -429,18 +531,23 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - asList(new KeyValueTimestamp<>( + asList( + new KeyValueTimestamp<>( aDeserialized, new Change<>("1", null), - DEFAULT_TIMESTAMP), + DEFAULT_TIMESTAMP + ), new KeyValueTimestamp<>( aDeserialized, new Change<>("2", null), - DEFAULT_TIMESTAMP), + DEFAULT_TIMESTAMP + ), new KeyValueTimestamp<>( aDeserialized, new Change<>(null, null), - DEFAULT_TIMESTAMP)), + DEFAULT_TIMESTAMP + ) + ), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -464,8 +571,28 @@ public class CachingSessionStoreTest { cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); - final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = cachingStore.findSessions(keyAA, 0L, 10L); - final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = cachingStore.findSessions(keyAA, keyAA, 0L, 10L); + final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = + cachingStore.findSessions(keyAA, 0L, 10L); + final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = + cachingStore.findSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = + cachingStore.backwardFindSessions(keyAA, 0L, 10L); + final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = + cachingStore.backwardFindSessions(keyAA, keyAA, 0L, 10L); assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); @@ -482,68 +609,91 @@ public class CachingSessionStoreTest { assertEquals(0, cache.size()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFetchFromClosedCachingStore() { cachingStore.close(); - cachingStore.fetch(keyA); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() { cachingStore.close(); - cachingStore.findSessions(keyA, 0, Long.MAX_VALUE); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToRemoveFromClosedCachingStore() { cachingStore.close(); - cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)))); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToPutIntoClosedCachingStore() { cachingStore.close(); - cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes())); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { - cachingStore.findSessions(null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - cachingStore.findSessions(null, keyA, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - cachingStore.findSessions(keyA, null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - cachingStore.fetch(null, keyA); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - cachingStore.fetch(keyA, null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { - cachingStore.fetch(null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnRemoveNullKey() { - cachingStore.remove(null); + assertThrows(NullPointerException.class, () -> cachingStore.remove(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnPutNullKey() { - cachingStore.put(null, "1".getBytes()); + assertThrows(NullPointerException.class, () -> cachingStore.put(null, "1".getBytes())); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class)) { + final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = + cachingStore.backwardFindSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List<String> messages = appender.getMessages(); + assertThat( + messages, + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) + ); + } } @Test @@ -558,10 +708,12 @@ public class CachingSessionStoreTest { final List<String> messages = appender.getMessages(); assertThat( messages, - hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + - " Note that the built-in numerical serdes do not follow this for negative numbers") + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) ); } } @@ -585,9 +737,9 @@ public class CachingSessionStoreTest { } public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> { - final Deserializer<K> keyDeserializer; - final Deserializer<V> valueDesializer; - final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>(); + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> valueDesializer; + private final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>(); CacheFlushListenerStub(final Deserializer<K> keyDeserializer, final Deserializer<V> valueDesializer) { @@ -606,7 +758,9 @@ public class CachingSessionStoreTest { new Change<>( valueDesializer.deserialize(null, newValue), valueDesializer.deserialize(null, oldValue)), - timestamp)); + timestamp + ) + ); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 2a04c48..86ee164 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -75,7 +75,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class CachingWindowStoreTest { +public class CachingPersistentWindowStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 150; private static final long DEFAULT_TIMESTAMP = 10L; @@ -88,7 +88,7 @@ public class CachingWindowStoreTest { private RocksDBSegmentedBytesStore bytesStore; private WindowStore<Bytes, byte[]> underlyingStore; private CachingWindowStore cachingStore; - private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener; + private CacheFlushListenerStub<Windowed<String>, String> cacheListener; private ThreadCache cache; private WindowKeySchema keySchema; @@ -99,7 +99,7 @@ public class CachingWindowStoreTest { underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE); final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); keyDeserializer.setIsChangelogTopic(true); - cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); + cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); cachingStore.setFlushListener(cacheListener, false); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index c55c4e15..8fdbd33 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -131,6 +131,16 @@ public class ChangeLoggingSessionBytesStoreTest { } @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFetching() { + EasyMock.expect(inner.backwardFetch(bytesKey)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFetch(bytesKey); + EasyMock.verify(inner); + } + + @Test public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { EasyMock.expect(inner.fetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.emptyIterator()); @@ -141,6 +151,16 @@ public class ChangeLoggingSessionBytesStoreTest { } @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFetchingRange() { + EasyMock.expect(inner.backwardFetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFetch(bytesKey, bytesKey); + EasyMock.verify(inner); + } + + @Test public void shouldDelegateToUnderlyingStoreWhenFindingSessions() { EasyMock.expect(inner.findSessions(bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); @@ -151,6 +171,16 @@ public class ChangeLoggingSessionBytesStoreTest { } @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessions() { + EasyMock.expect(inner.backwardFindSessions(bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFindSessions(bytesKey, 0, 1); + EasyMock.verify(inner); + } + + @Test public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() { EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); @@ -161,6 +191,16 @@ public class ChangeLoggingSessionBytesStoreTest { } @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessionRange() { + EasyMock.expect(inner.backwardFindSessions(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFindSessions(bytesKey, bytesKey, 0, 1); + EasyMock.verify(inner); + } + + @Test public void shouldFlushUnderlyingStore() { inner.flush(); EasyMock.expectLastCall(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 617ff36..4bd125a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -55,56 +55,101 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { @Test public void shouldHaveNextFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldHaveNextFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertTrue(mergeIterator.hasNext()); } @Test public void shouldGetNextFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); + } + + @Test + public void shouldGetNextFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); } @Test public void shouldPeekNextKeyFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); + } + + @Test + public void shouldPeekNextKeyFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); } @Test public void shouldHaveNextFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldHaveNextFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertTrue(mergeIterator.hasNext()); } @Test public void shouldGetNextFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); + } + + @Test + public void shouldGetNextFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); } @Test public void shouldPeekNextKeyFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); + } + + @Test + public void shouldPeekNextKeyFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); } @Test public void shouldIterateBothStoreAndCache() { - final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs); + final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs, true); assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); assertFalse(iterator.hasNext()); } + @Test + public void shouldReverseIterateBothStoreAndCache() { + final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs, false); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); + assertFalse(iterator.hasNext()); + } + private MergedSortedCacheSessionStoreIterator createIterator(final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs, - final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs) { + final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs, + final boolean forward) { final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs)); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); - return new MergedSortedCacheSessionStoreIterator(cacheIterator, storeIterator, SINGLE_SEGMENT_CACHE_FUNCTION); + return new MergedSortedCacheSessionStoreIterator(cacheIterator, storeIterator, SINGLE_SEGMENT_CACHE_FUNCTION, forward); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index a77dd07..6b1889a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -303,6 +303,26 @@ public class MeteredSessionStoreTest { } @Test + public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + + @Test public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) .andReturn(new KeyValueIteratorStub<>( @@ -320,6 +340,26 @@ public class MeteredSessionStoreTest { } @Test + public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + + @Test public void shouldRemoveFromStoreAndRecordRemoveMetric() { innerStore.remove(WINDOWED_KEY_BYTES); expectLastCall(); @@ -351,6 +391,26 @@ public class MeteredSessionStoreTest { } @Test + public void shouldBackwardFetchForKeyAndRecordFetchMetric() { + expect(innerStore.backwardFetch(KEY_BYTES)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + + @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { expect(innerStore.fetch(KEY_BYTES, KEY_BYTES)) .andReturn(new KeyValueIteratorStub<>( @@ -368,6 +428,26 @@ public class MeteredSessionStoreTest { } @Test + public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY, KEY); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + + @Test public void shouldRecordRestoreTimeOnInit() { init(); final KafkaMetric metric = metric("restore-rate"); diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index a2924fc..ff37e25 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -32,7 +32,7 @@ import java.util.NavigableMap; import java.util.TreeMap; public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V>, StateStore { - private NavigableMap<K, List<KeyValue<Windowed<K>, V>>> sessions = new TreeMap<>(); + private final NavigableMap<K, List<KeyValue<Windowed<K>, V>>> sessions = new TreeMap<>(); private boolean open = true; public void put(final Windowed<K> sessionKey, final V value) { @@ -43,6 +43,31 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V } @Override + public KeyValueIterator<Windowed<K>, V> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator<Windowed<K>, V> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardFindSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public V fetchSession(K key, long startTime, long endTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K key) { if (!open) { throw new InvalidStateStoreException("not open"); @@ -54,6 +79,17 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V } @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(K key) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (!sessions.containsKey(key)) { + return new KeyValueIteratorStub<>(Collections.emptyIterator()); + } + return new KeyValueIteratorStub<>(sessions.descendingMap().get(key).iterator()); + } + + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) { if (!open) { throw new InvalidStateStoreException("not open"); @@ -88,6 +124,40 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V } @Override + public KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (sessions.subMap(from, true, to, true).isEmpty()) { + return new KeyValueIteratorStub<>(Collections.emptyIterator()); + } + final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = + sessions.subMap(from, true, to, true).descendingMap().values().iterator(); + return new KeyValueIteratorStub<>( + new Iterator<KeyValue<Windowed<K>, V>>() { + + Iterator<KeyValue<Windowed<K>, V>> it; + + @Override + public boolean hasNext() { + while (it == null || !it.hasNext()) { + if (!keysIterator.hasNext()) { + return false; + } + it = keysIterator.next().iterator(); + } + return true; + } + + @Override + public KeyValue<Windowed<K>, V> next() { + return it.next(); + } + } + ); + } + + @Override public String name() { return ""; }