This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 70a3359  KAFKA-12925: adding presfixScan operation for missed 
implementations (#10877)
70a3359 is described below

commit 70a3359fbd31b7f7a0cad01caf1fc3b7644d7516
Author: vamossagar12 <[email protected]>
AuthorDate: Thu Jul 15 04:25:50 2021 +0530

    KAFKA-12925: adding presfixScan operation for missed implementations 
(#10877)
    
    The new prefixScan API may still throw UnsupportedVersionOperationException 
due to some missing implementations in vast store hierarchy of Streams, this PR 
adds those missing overrides and expands the test coverage.
    
    Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../internals/AbstractReadOnlyDecorator.java       |    7 +
 .../internals/AbstractReadWriteDecorator.java      |    7 +
 .../internals/CompositeReadOnlyKeyValueStore.java  |   21 +
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |    7 +
 .../streams/state/internals/MemoryLRUCache.java    |   10 +
 .../state/internals/MemoryNavigableLRUCache.java   |   15 +
 .../state/internals/MeteredKeyValueStore.java      |    2 +-
 .../internals/ReadOnlyKeyValueStoreFacade.java     |    7 +
 .../streams/state/internals/RocksDBStore.java      |    2 +
 .../internals/TimestampedKeyValueStoreBuilder.java |    7 +
 .../integration/QueryableStateIntegrationTest.java |   58 ++
 .../processor/internals/ProcessorTopologyTest.java | 1000 ++++++++++++++++++++
 .../CompositeReadOnlyKeyValueStoreTest.java        |   86 ++
 .../internals/ReadOnlyKeyValueStoreFacadeTest.java |   16 +
 .../org/apache/kafka/test/MockKeyValueStore.java   |    6 +
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   |    6 +
 16 files changed, 1256 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index be47e89..3ec8d7f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -116,6 +117,12 @@ abstract class AbstractReadOnlyDecorator<T extends 
StateStore, K, V> extends Wra
         }
 
         @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix,
+                                                                               
final PS prefixKeySerializer) {
+            return wrapped().prefixScan(prefix, prefixKeySerializer);
+        }
+
+        @Override
         public long approximateNumEntries() {
             return wrapped().approximateNumEntries();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 60fa8f4..aff099a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -110,6 +111,12 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
         }
 
         @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix,
+                                                                               
final PS prefixKeySerializer) {
+            return wrapped().prefixScan(prefix, prefixKeySerializer);
+        }
+
+        @Override
         public long approximateNumEntries() {
             return wrapped().approximateNumEntries();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 54e5f1e..4da9e5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -105,6 +106,26 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+        Objects.requireNonNull(prefix);
+        Objects.requireNonNull(prefixKeySerializer);
+        final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> 
nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, 
V>>() {
+            @Override
+            public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, 
V> store) {
+                try {
+                    return store.prefixScan(prefix, prefixKeySerializer);
+                } catch (final InvalidStateStoreException e) {
+                    throw new InvalidStateStoreException("State store is not 
available anymore and may have been migrated to another instance; please 
re-discover its location from the state metadata.");
+                }
+            }
+        };
+        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        return new DelegatingPeekingKeyValueIterator<>(
+            storeName,
+            new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
+    }
+
+    @Override
     public KeyValueIterator<K, V> all() {
         final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> 
nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, 
V>>() {
             @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index 6bb0950..d9b42c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -142,6 +143,12 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter 
implements KeyValueSt
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new 
KeyValueToTimestampedKeyValueIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 236fedc..22f1215 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -178,6 +179,15 @@ public class MemoryLRUCache implements 
KeyValueStore<Bytes, byte[]> {
         throw new UnsupportedOperationException("MemoryLRUCache does not 
support reverseAll() function.");
     }
 
+    /**
+     * @throws UnsupportedOperationException at every invocation
+     */
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        throw new UnsupportedOperationException("MemoryLRUCache does not 
support prefixScan() function.");
+    }
+
     @Override
     public long approximateNumEntries() {
         return this.map.size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 9ad885d..fd636ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -67,6 +68,20 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
+        final Bytes to = Bytes.increment(from);
+
+        final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+
+        return new DelegatingPeekingKeyValueIterator<>(
+            name(),
+            new MemoryNavigableLRUCache.CacheIterator(treeMap.subMap(from, 
true, to, false).keySet().iterator(), treeMap)
+        );
+    }
+
+    @Override
     public  KeyValueIterator<Bytes, byte[]> all() {
         final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
         return new 
MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(), 
treeMap);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index f26132d..e64cdf4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -259,7 +259,7 @@ public class MeteredKeyValueStore<K, V>
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-        Objects.requireNonNull(prefix, "key cannot be null");
+        Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
         return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
index 2ffa3f3..7a03f72 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -47,6 +48,12 @@ public class ReadOnlyKeyValueStoreFacade<K, V> implements 
ReadOnlyKeyValueStore<
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix,
+                                                                           
final PS prefixKeySerializer) {
+        return new KeyValueIteratorFacade<>(inner.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
     public KeyValueIterator<K, V> all() {
         return new KeyValueIteratorFacade<>(inner.all());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f501d13..86f2a83 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -308,6 +308,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
                                                                                
     final PS prefixKeySerializer) {
         validateStoreOpen();
+        Objects.requireNonNull(prefix, "prefix cannot be null");
+        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
         final Bytes prefixBytes = 
Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
 
         final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = 
dbAccessor.prefixScan(prefixBytes);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index 444d005..a249a14 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
@@ -158,6 +159,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
 
         @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
         final PS prefixKeySerializer) {
+            return wrapped.prefixScan(prefix, prefixKeySerializer);
+        }
+
+        @Override
         public long approximateNumEntries() {
             return wrapped.approximateNumEntries();
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index d07648b..af954e64 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -845,6 +845,64 @@ public class QueryableStateIntegrationTest {
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), 
myMapStore.get(batchEntry.key));
         }
+
+        final KeyValueIterator<String, Long> range = myMapStore.range("hello", 
"kafka");
+        while (range.hasNext()) {
+            System.out.println(range.next());
+        }
+    }
+
+    @Test
+    public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
+
+        final List<KeyValue<String, Long>> expectedPrefixScanResult = 
Arrays.asList(
+            new KeyValue<>(keys[3], 5L),
+            new KeyValue<>(keys[1], 1L)
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final KTable<String, String> t1 = builder.table(streamOne);
+        t1
+            .mapValues(
+                (ValueMapper<String, Long>) Long::valueOf,
+                Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
+
+        final ReadOnlyKeyValueStore<String, Long> myMapStore =
+            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, 
keyValueStore());
+
+        int index = 0;
+        final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", Serdes.String().serializer());
+        while (range.hasNext()) {
+            assertEquals(expectedPrefixScanResult.get(index++), range.next());
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index b92a468..a0dc50e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Processor;
@@ -42,6 +43,7 @@ import 
org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -56,6 +58,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.Supplier;
 
 import static java.util.Arrays.asList;
@@ -383,6 +387,1002 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging() 
{
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void 
testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanLruMapNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanLruMapWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanLruMapWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void 
testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
 Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Deprecated // testing old PAPI
+    @Test
+    public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100), 
Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = 
store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
     @Deprecated // testing old PAPI
     @Test
     public void shouldDriveGlobalStore() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index ca8468b..652694b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -113,6 +114,16 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldThrowNullPointerExceptionOnPrefixScanNullPrefix() {
+        assertThrows(NullPointerException.class, () -> 
theStore.prefixScan(null, new StringSerializer()));
+    }
+
+    @Test
+    public void 
shouldThrowNullPointerExceptionOnPrefixScanNullPrefixKeySerializer() {
+        assertThrows(NullPointerException.class, () -> 
theStore.prefixScan("aa", null));
+    }
+
+    @Test
     public void shouldThrowNullPointerExceptionOnReverseRangeNullFromKey() {
         assertThrows(NullPointerException.class, () -> 
theStore.reverseRange(null, "to"));
     }
@@ -151,6 +162,22 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldThrowNoSuchElementExceptionWhileNextForPrefixScan() {
+        stubOneUnderlying.put("a", "1");
+        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
+        keyValueIterator.next();
+        assertThrows(NoSuchElementException.class, keyValueIterator::next);
+    }
+
+    @Test
+    public void shouldThrowNoSuchElementExceptionWhilePeekNextForPrefixScan() {
+        stubOneUnderlying.put("a", "1");
+        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
+        keyValueIterator.next();
+        assertThrows(NoSuchElementException.class, 
keyValueIterator::peekNextKey);
+    }
+
+    @Test
     public void shouldThrowUnsupportedOperationExceptionWhileRemove() {
         final KeyValueIterator<String, String> keyValueIterator = 
theStore.all();
         assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
@@ -173,6 +200,14 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldThrowUnsupportedOperationExceptionWhilePrefixScan() {
+        stubOneUnderlying.put("a", "1");
+        stubOneUnderlying.put("b", "1");
+        final KeyValueIterator<String, String> keyValueIterator = 
theStore.prefixScan("a", new StringSerializer());
+        assertThrows(UnsupportedOperationException.class, 
keyValueIterator::remove);
+    }
+
+    @Test
     public void shouldFindValueForKeyWhenMultiStores() {
         final KeyValueStore<String, String> cache = newStoreInstance();
         stubProviderTwo.addStore(storeName, cache);
@@ -212,6 +247,29 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
+        stubOneUnderlying.put("abc", "a");
+        stubOneUnderlying.put("abcd", "b");
+        stubOneUnderlying.put("abce", "c");
+
+        final List<KeyValue<String, String>> results = 
toList(theStore.prefixScan("abcd", new StringSerializer()));
+        assertTrue(results.contains(new KeyValue<>("abcd", "b")));
+        assertEquals(1, results.size());
+    }
+
+    @Test
+    public void shouldSupportPrefixScan() {
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("aa", "b");
+        stubOneUnderlying.put("b", "c");
+
+        final List<KeyValue<String, String>> results = 
toList(theStore.prefixScan("a", new StringSerializer()));
+        assertTrue(results.contains(new KeyValue<>("a", "a")));
+        assertTrue(results.contains(new KeyValue<>("aa", "b")));
+        assertEquals(2, results.size());
+    }
+
+    @Test
     public void shouldSupportRangeAcrossMultipleKVStores() {
         final KeyValueStore<String, String> cache = newStoreInstance();
         stubProviderTwo.addStore(storeName, cache);
@@ -236,6 +294,29 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldSupportPrefixScanAcrossMultipleKVStores() {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("aa", "c");
+        cache.put("ab", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = 
toList(theStore.prefixScan("a", new StringSerializer()));
+        assertArrayEquals(
+            asList(
+                new KeyValue<>("a", "a"),
+                new KeyValue<>("aa", "c"),
+                new KeyValue<>("ab", "d")
+            ).toArray(),
+            results.toArray());
+    }
+
+    @Test
     public void shouldSupportReverseRangeAcrossMultipleKVStores() {
         final KeyValueStore<String, String> cache = newStoreInstance();
         stubProviderTwo.addStore(storeName, cache);
@@ -323,6 +404,11 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test
+    public void shouldThrowInvalidStoreExceptionOnPrefixScanDuringRebalance() {
+        assertThrows(InvalidStateStoreException.class, () -> 
rebalancing().prefixScan("anything", new StringSerializer()));
+    }
+
+    @Test
     public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() {
         assertThrows(InvalidStateStoreException.class, () -> 
rebalancing().all());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
index 4b0ca79..ffb5ab3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -75,6 +76,21 @@ public class ReadOnlyKeyValueStoreFacadeTest {
     }
 
     @Test
+    public void shouldReturnPlainKeyValuePairsForPrefixScan() {
+        final StringSerializer stringSerializer = new StringSerializer();
+        expect(mockedKeyValueTimestampIterator.next())
+            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        expect(mockedKeyValueTimestampStore.prefixScan("key", 
stringSerializer)).andReturn(mockedKeyValueTimestampIterator);
+        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+        final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+    }
+
+    @Test
     public void shouldReturnPlainKeyValuePairsForAllIterator() {
         expect(mockedKeyValueTimestampIterator.next())
             .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java 
b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index d9e4acf..91f8a6f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -126,6 +127,11 @@ public class MockKeyValueStore implements 
KeyValueStore<Object, Object> {
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Object, Object> 
prefixScan(P prefix, PS prefixKeySerializer) {
+        return null;
+    }
+
+    @Override
     public KeyValueIterator<Object, Object> all() {
         return null;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index c6b5eee..7234231 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -55,6 +56,11 @@ public class NoOpReadOnlyStore<K, V> implements 
ReadOnlyKeyValueStore<K, V>, Sta
     }
 
     @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P 
prefix, PS prefixKeySerializer) {
+        return null;
+    }
+
+    @Override
     public KeyValueIterator<K, V> all() {
         return null;
     }

Reply via email to