ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r468843465
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +61,8 @@ public synchronized boolean hasNext() {
return allDone();
} else {
next = getKeyValue();
- iter.next();
+ if (reverse) iter.prev();
Review comment:
separate lines 🙂
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawToKey;
+ private final byte[] rawLastKey;
+ private final boolean reverse;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>>
openIterators,
final Bytes from,
- final Bytes to) {
- super(storeName, iter, openIterators);
- iter.seek(from.get());
- rawToKey = to.get();
- if (rawToKey == null) {
+ final Bytes to,
+ final boolean reverse) {
+ super(storeName, iter, openIterators, reverse);
+ this.reverse = reverse;
+ if (reverse) {
+ iter.seekForPrev(to.get());
+ rawLastKey = from.get();
+ } else {
+ iter.seek(from.get());
+ rawLastKey = to.get();
+ }
+ if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is
null for key " + to);
Review comment:
nit: `RawToKey` --> `RawLastKey`
##########
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -294,13 +305,27 @@ public void shouldIterateOverRange() {
assertEquals(items, results.size());
}
+ @Test
+ public void shouldReverseIterateOverRange() {
+ final int items = addItemsToCache();
+ final KeyValueIterator<Bytes, byte[]> range =
+ store.reverseRange(bytesKey(String.valueOf(0)),
bytesKey(String.valueOf(items)));
+ final List<Bytes> results = new ArrayList<>();
+ while (range.hasNext()) {
+ results.add(range.next().key);
+ }
+ assertEquals(items, results.size());
Review comment:
Can we add some tests that verify the actual contents + order of the
reverse range?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -150,8 +166,9 @@ public void close() {
private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes,
byte[]> {
private final Iterator<Bytes> iter;
- private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
- this.iter = new TreeSet<>(keySet).iterator();
+ private InMemoryKeyValueIterator(final Set<Bytes> keySet, final
boolean reverse) {
+ if (reverse) this.iter = new
TreeSet<>(keySet).descendingIterator();
+ else this.iter = new TreeSet<>(keySet).iterator();
Review comment:
nit: Use braces & separate lines
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -110,7 +111,15 @@ public void putAll(final List<KeyValue<Bytes, byte[]>>
entries) {
@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes
from, final Bytes to) {
+ return range(from, to, false);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
+ return range(from, to, true);
+ }
+ KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to,
final boolean reverse) {
Review comment:
Should be private
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -38,35 +38,68 @@
*
* @param key The key to fetch
* @return The value or null if no value is found.
- * @throws NullPointerException If null is used for key.
+ * @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V get(K key);
/**
* Get an iterator over a given range of keys. This iterator must be
closed after use.
* The returned iterator must be safe from {@link
java.util.ConcurrentModificationException}s
- * and must not return null values. No ordering guarantees are provided.
- * @param from The first key that could be in the range
- * @param to The last key that could be in the range
- * @return The iterator for this range.
- * @throws NullPointerException If null is used for from or to.
+ * and must not return null values.
+ * Order is not guaranteed as bytes lexicographical ordering might not
represent key order.
+ *
+ * @param from The first key that could be in the range, where iteration
starts from.
+ * @param to The last key that could be in the range, where iteration
ends.
+ * @return The iterator for this range, from smallest to largest bytes.
+ * @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> range(K from, K to);
+ /**
+ * Get a reverse iterator over a given range of keys. This iterator must
be closed after use.
+ * The returned iterator must be safe from {@link
java.util.ConcurrentModificationException}s
+ * and must not return null values.
+ * Order is not guaranteed as bytes lexicographical ordering might not
represent key order.
+ *
+ * @param from The first key that could be in the range, where iteration
ends.
+ * @param to The last key that could be in the range, where iteration
starts from.
Review comment:
Seems a bit tricky to say that _to_ is the variable where iteration
starts _from_ 😉 But I can see it both ways, so being clear in the javadocs is
good enough for me
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##########
@@ -281,15 +281,24 @@ public boolean isEmpty() {
}
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
- return keySetIterator(cache.navigableKeySet().subSet(from, true, to,
true));
+ return keySetIterator(cache.navigableKeySet().subSet(from, true, to,
true), false);
}
- private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
- return new TreeSet<>(keySet).iterator();
+ synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes
to) {
+ return keySetIterator(cache.navigableKeySet().subSet(from, true, to,
true), true);
+ }
+
+ private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final
boolean reverse) {
+ if (reverse) return new TreeSet<>(keySet).descendingIterator();
Review comment:
nit: braces + separate lines
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawToKey;
+ private final byte[] rawLastKey;
+ private final boolean reverse;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>>
openIterators,
final Bytes from,
- final Bytes to) {
- super(storeName, iter, openIterators);
- iter.seek(from.get());
- rawToKey = to.get();
- if (rawToKey == null) {
+ final Bytes to,
+ final boolean reverse) {
+ super(storeName, iter, openIterators, reverse);
+ this.reverse = reverse;
+ if (reverse) {
+ iter.seekForPrev(to.get());
+ rawLastKey = from.get();
+ } else {
+ iter.seek(from.get());
+ rawLastKey = to.get();
+ }
+ if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is
null for key " + to);
}
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
final KeyValue<Bytes, byte[]> next = super.makeNext();
-
if (next == null) {
return allDone();
} else {
- if (comparator.compare(next.key.get(), rawToKey) <= 0) {
- return next;
+ if (!reverse) {
+ if (comparator.compare(next.key.get(), rawLastKey) <= 0)
return next;
+ else return allDone();
Review comment:
nit: braces & separate lines
##########
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -422,6 +503,21 @@ public void
shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
" Note that the built-in numerical serdes do not follow
this for negative numbers")
);
}
+ }
+
+ @Test
+ public void
shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() {
Review comment:
Can we add tests for some other invalid range cases? For example with
both bounds positive but from > to
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -306,26 +313,29 @@ public synchronized boolean hasNext() {
} else {
next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
nextWithTimestamp = null;
- iterWithTimestamp.next();
+ if (reverse) iterWithTimestamp.prev();
Review comment:
braces & separate lines here and below
##########
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -339,12 +366,24 @@ public void
shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
store.range(bytesKey("a"), bytesKey("b"));
}
+ @Test(expected = InvalidStateStoreException.class)
Review comment:
Use `assertThrows` -- we've been (slowly) migrating away from
`@Test(expected)` in the Streams tests
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -193,22 +193,26 @@ public void prepareBatch(final List<KeyValue<Bytes,
byte[]>> entries,
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
- final Bytes to) {
+ final Bytes to,
+ final boolean reverse) {
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
from,
- to);
+ to,
+ reverse);
}
@Override
- public KeyValueIterator<Bytes, byte[]> all() {
+ public KeyValueIterator<Bytes, byte[]> all(final boolean reverse) {
final RocksIterator innerIterWithTimestamp =
db.newIterator(newColumnFamily);
- innerIterWithTimestamp.seekToFirst();
+ if (reverse) innerIterWithTimestamp.seekToLast();
Review comment:
braces & separate lines
##########
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -150,6 +161,17 @@ public void
shouldThrowUnsupportedOperationExceptionWhileRemove() {
} catch (final UnsupportedOperationException e) { }
}
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() {
+ stubOneUnderlying.put("a", "1");
+ stubOneUnderlying.put("b", "1");
+ final KeyValueIterator<String, String> keyValueIterator =
theStore.reverseRange("a", "b");
+ try {
+ keyValueIterator.remove();
Review comment:
use `assertThrows` here as well
##########
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -199,11 +236,44 @@ public void shouldSupportRangeAcrossMultipleKVStores() {
cache.put("x", "x");
final List<KeyValue<String, String>> results =
toList(theStore.range("a", "e"));
+ assertArrayEquals(
+ asList(
+ new KeyValue<>("a", "a"),
+ new KeyValue<>("b", "b"),
+ new KeyValue<>("c", "c"),
+ new KeyValue<>("d", "d")
+ ).toArray(),
+ results.toArray());
+ }
+
+ @Test
+ public void shouldSupportReverseRangeAcrossMultipleKVStores() {
+ final KeyValueStore<String, String> cache = newStoreInstance();
+ stubProviderTwo.addStore(storeName, cache);
+
+ stubOneUnderlying.put("a", "a");
+ stubOneUnderlying.put("b", "b");
+ stubOneUnderlying.put("z", "z");
+
+ cache.put("c", "c");
+ cache.put("d", "d");
+ cache.put("x", "x");
+
+ final List<KeyValue<String, String>> results =
toList(theStore.reverseRange("a", "e"));
assertTrue(results.contains(new KeyValue<>("a", "a")));
assertTrue(results.contains(new KeyValue<>("b", "b")));
assertTrue(results.contains(new KeyValue<>("c", "c")));
assertTrue(results.contains(new KeyValue<>("d", "d")));
assertEquals(4, results.size());
+ //FIXME: order does not hold between stores, how to validate order
here?
Review comment:
I think the best we can do is just make sure that order is correct
within a store. ie if `a`, `m` are all in `stubOneUnderling` then make sure the
reverse range returns `m` before `a`.
I also think it would be fine to just make sure all the expected values are
returned without checking the order, since there are other tests to verify that
the order within a store is correct
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawToKey;
+ private final byte[] rawLastKey;
+ private final boolean reverse;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>>
openIterators,
final Bytes from,
- final Bytes to) {
- super(storeName, iter, openIterators);
- iter.seek(from.get());
- rawToKey = to.get();
- if (rawToKey == null) {
+ final Bytes to,
+ final boolean reverse) {
+ super(storeName, iter, openIterators, reverse);
+ this.reverse = reverse;
+ if (reverse) {
+ iter.seekForPrev(to.get());
+ rawLastKey = from.get();
+ } else {
+ iter.seek(from.get());
+ rawLastKey = to.get();
+ }
+ if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is
null for key " + to);
Review comment:
Also it should be `from` for the reverse case, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]