cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r527583614



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -103,6 +105,19 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+        Objects.requireNonNull(prefix, "prefix cannot be null");
+        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+
+        final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
+        final Bytes to = Bytes.increment(from);
+
+        return new DelegatingPeekingKeyValueIterator<>(
+            name,
+            new InMemoryKeyValueIterator(map.subMap(from, true, to, 
false).keySet(), true));

Review comment:
       nit: The last parenthesis should go to a new line. I know that we are 
not consistent throughout the code base (that is why this comment is prefixed 
with "nit") but that is actually the code style, we agreed upon. If you need to 
push another commit you can fix this, otherwise it's fine. 
   ```suggestion
           return new DelegatingPeekingKeyValueIterator<>(
               name,
               new InMemoryKeyValueIterator(map.subMap(from, true, to, 
false).keySet(), true)
           );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
##########
@@ -359,6 +361,31 @@ public void shouldReverseIterateOverRange() {
         ), results);
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(bytesKey("k1"), bytesValue("1")));
+        entries.add(new KeyValue<>(bytesKey("k2"), bytesValue("2")));
+        entries.add(new KeyValue<>(bytesKey("p2"), bytesValue("2")));
+        entries.add(new KeyValue<>(bytesKey("p1"), bytesValue("2")));
+        entries.add(new KeyValue<>(bytesKey("p0"), bytesValue("2")));
+        store.putAll(entries);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("p", new StringSerializer());
+        final List<String> keys = new ArrayList<>();
+        final List<String> values = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            keys.add(next.key.toString());
+            values.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        assertThat(numberOfKeysReturned, is(3));
+        assertThat(keys, is(Arrays.asList("p0", "p1", "p2")));
+        assertThat(values, is(Arrays.asList("2", "2", "2")));
+    }

Review comment:
       The following request is not a requirement to get the PR approved, but 
rather optional extra work to improve the code base. Could you rewrite this 
test with a mock for the inner state store as in `MeteredKeyValueStoreTest`?  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -229,6 +230,15 @@ public V delete(final K key) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        return new MeteredKeyValueIterator(
+                wrapped().prefixScan(prefix, prefixKeySerializer),
+                rangeSensor

Review comment:
       I am not sure whether we should use the `rangeSensor` here or introduce 
a new `prefixScanSensor`. I looked into the KIP discussion but could not find 
any reference to metrics. Either I missed it or we missed it in the KIP 
discussion. What do the reviewers of the KIP think @ableegoldman @vvcephei 
@guozhangwang?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##########
@@ -284,6 +284,12 @@ public boolean isEmpty() {
         return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true), true);
     }
 
+    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, 
final boolean toInclusive) {
+        if (toInclusive)
+            keyRange(from, to);
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
false), true);
+    }

Review comment:
       Why not simply:
   ```suggestion
       synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, 
final boolean toInclusive) {
           return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
toInclusive), true);
       }
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,115 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_3")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k2")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_2")),
+            stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k3")),
+            stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_1")),
+            stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("prefix", stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertThat(numberOfKeysReturned, is(3));
+        // The order might seem inverted to the order in which keys were 
inserted, but since Rocksdb stores keys
+        // lexicographically, prefix_1 would still be the first key that is 
returned.
+        assertThat(valuesWithPrefix.get(0), is("f"));
+        assertThat(valuesWithPrefix.get(1), is("d"));
+        assertThat(valuesWithPrefix.get(2), is("b"));
+
+        // Lastly, simple key value lookups should still work :)
+        assertThat("c", is(stringDeserializer.deserialize(null,
+            rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"k2"))))));

Review comment:
       Could you please remove the inline comments? They do not add too much 
information.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {
+    private final byte[] rawPrefix;
+
+    RocksDBPrefixIterator(final String name,
+                          final RocksIterator newIterator,
+                          final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
+                          final Bytes prefix) {
+        super(name, newIterator, openIterators, true);
+        this.rawPrefix = prefix.get();
+        newIterator.seek(rawPrefix);
+    }
+
+    private boolean prefixEquals(final byte[] prefix1, final byte[] prefix2) {
+        final int min = Math.min(prefix1.length, prefix2.length);
+        final ByteBuffer prefix1Slice = ByteBuffer.wrap(prefix1, 0, min);
+        final ByteBuffer prefix2Slice = ByteBuffer.wrap(prefix2, 0, min);
+        return prefix1Slice.equals(prefix2Slice);

Review comment:
       With this code `abcd` would be a prefix of `abc`. Is this intended? 
Those cases should be tested in the missing unit tests for this class. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {
+    private final byte[] rawPrefix;
+
+    RocksDBPrefixIterator(final String name,
+                          final RocksIterator newIterator,
+                          final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
+                          final Bytes prefix) {
+        super(name, newIterator, openIterators, true);
+        this.rawPrefix = prefix.get();
+        newIterator.seek(rawPrefix);
+    }
+
+    private boolean prefixEquals(final byte[] prefix1, final byte[] prefix2) {
+        final int min = Math.min(prefix1.length, prefix2.length);
+        final ByteBuffer prefix1Slice = ByteBuffer.wrap(prefix1, 0, min);
+        final ByteBuffer prefix2Slice = ByteBuffer.wrap(prefix2, 0, min);
+        return prefix1Slice.equals(prefix2Slice);
+    }
+
+    @Override
+    public KeyValue<Bytes, byte[]> makeNext() {
+        final KeyValue<Bytes, byte[]> next = super.makeNext();
+        if (next == null) return allDone();
+        else {
+            if (prefixEquals(this.rawPrefix, next.key.get())) return next;
+            else return allDone();
+        }

Review comment:
       ```suggestion
           if (next == null || !prefixEquals(this.rawPrefix, next.key.get())) {
               return allDone();
           } else {
               return next;
           }
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,115 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_3")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k2")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_2")),
+            stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k3")),
+            stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_1")),
+            stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("prefix", stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertThat(numberOfKeysReturned, is(3));
+        // The order might seem inverted to the order in which keys were 
inserted, but since Rocksdb stores keys
+        // lexicographically, prefix_1 would still be the first key that is 
returned.
+        assertThat(valuesWithPrefix.get(0), is("f"));
+        assertThat(valuesWithPrefix.get(1), is("d"));
+        assertThat(valuesWithPrefix.get(2), is("b"));
+
+        // Lastly, simple key value lookups should still work :)
+        assertThat("c", is(stringDeserializer.deserialize(null,
+            rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"k2"))))));
+    }
+
+    @Test
+    public void shouldReturnUUIDsWithStringPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
+        final String prefix = uuid1.toString().substring(0, 4);
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid1)),
+            stringSerializer.serialize(null, "a")));
+
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid2)),
+            stringSerializer.serialize(null, "b")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan(prefix, stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+
+        assertThat(numberOfKeysReturned, is(1));
+        assertThat(valuesWithPrefix.get(0), is("a"));
+    }
+
+    @Test
+    public void shouldReturnNoKeys() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "a")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "b")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "c")),
+            stringSerializer.serialize(null, "e")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("d", stringSerializer);
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            keysWithPrefix.next();
+            numberOfKeysReturned++;
+        }
+        // Since there are no keys prefixed with d, the count should be 0

Review comment:
       Please remove this comment.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,115 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_3")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k2")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_2")),
+            stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k3")),
+            stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_1")),
+            stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
rocksDBStore.prefixScan("prefix", stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertThat(numberOfKeysReturned, is(3));
+        // The order might seem inverted to the order in which keys were 
inserted, but since Rocksdb stores keys
+        // lexicographically, prefix_1 would still be the first key that is 
returned.
+        assertThat(valuesWithPrefix.get(0), is("f"));
+        assertThat(valuesWithPrefix.get(1), is("d"));
+        assertThat(valuesWithPrefix.get(2), is("b"));
+
+        // Lastly, simple key value lookups should still work :)
+        assertThat("c", is(stringDeserializer.deserialize(null,
+            rocksDBStore.get(new Bytes(stringSerializer.serialize(null, 
"k2"))))));

Review comment:
       Do we need this verification? This is covered by other unit test 
methods, isn't it? Could you please remove it from here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -229,6 +230,15 @@ public V delete(final K key) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        return new MeteredKeyValueIterator(
+                wrapped().prefixScan(prefix, prefixKeySerializer),
+                rangeSensor
+        );

Review comment:
       nit: Sorry, I overlooked this last time. We actually use 4 space and not 
8.
   ```suggestion
           return new MeteredKeyValueIterator(
               wrapped().prefixScan(prefix, prefixKeySerializer),
               rangeSensor
           );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -196,6 +200,26 @@ public void shouldReturnValueOnGetWhenExists() {
         assertThat(store.get(hello), equalTo(world));
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        store.put(hi, there);
+        store.put(hello, world);
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = 
store.prefixScan("h", new StringSerializer());
+        final List<String> keys = new ArrayList<>();
+        final List<String> values = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            keys.add(next.key.toString());
+            values.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        assertThat(numberOfKeysReturned, is(2));
+        assertThat(keys, is(Arrays.asList("hello", "hi")));
+        assertThat(values, is(Arrays.asList("world", "there")));
+    }
+

Review comment:
       See my comment in `CachingInMemoryKeyValueStoreTest`. Also this not 
required.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -201,6 +201,14 @@ public MemoryLRUCacheBytesIterator reverseAll(final String 
namespace) {
         return new MemoryLRUCacheBytesIterator(cache.reverseAllKeys(), cache);
     }
 
+    public MemoryLRUCacheBytesIterator prefixScan(final String namespace, 
final Bytes from, final Bytes to) {

Review comment:
       This method need unit testing. I realized that also here we missed to 
cover all methods with unit tests. For example, there are also no unit tests 
for `all()`. Could you please add the ones for `prefixScan()`? Also here you 
are really welcome to add unit tests for the other methods in a separate PR, 
but it is not a requirement for the approval of this PR.  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -103,6 +105,19 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       This method still needs unit testing.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       Yes, you need explicit unit tests. If there are no unit tests for 
`RocksDbRangeIterator` then that is a hole in our test coverage. You do not 
need to add the tests for `RocksDbRangeIterator` but could you please create a 
ticket to document the missing unit tests? Of course, you are very welcome to 
assign the ticket to yourself and add the missing tests in a separate PR. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -431,4 +441,36 @@ public Bytes peekNextKey() {
             }
         }
     }
+
+    private class RocksDBDualCFPrefixIterator extends RocksDBDualCFIterator {
+        private final byte[] rawPrefix;
+
+        // In Prefix scan mode, we always move in forward direction.

Review comment:
       Please remove comment.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
##########
@@ -130,6 +134,8 @@ public void 
shouldMigrateDataFromDefaultToTimestampColumnFamily() throws Excepti
         // approx: 7 entries on old CF, 0 in new CF
         assertThat(rocksDBStore.approximateNumEntries(), is(7L));
 
+        // prefix scan should return 7 keys with prefix "key"
+        assertThat(runPrefixScan("key"), is(7));

Review comment:
       I guess, it would be more consistent to add your verifications to 
`iteratorsShouldNotMigrateData()` where all other iterators are tested.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to