This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 70a3359 KAFKA-12925: adding presfixScan operation for missed
implementations (#10877)
70a3359 is described below
commit 70a3359fbd31b7f7a0cad01caf1fc3b7644d7516
Author: vamossagar12 <[email protected]>
AuthorDate: Thu Jul 15 04:25:50 2021 +0530
KAFKA-12925: adding presfixScan operation for missed implementations
(#10877)
The new prefixScan API may still throw UnsupportedVersionOperationException
due to some missing implementations in vast store hierarchy of Streams, this PR
adds those missing overrides and expands the test coverage.
Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../internals/AbstractReadOnlyDecorator.java | 7 +
.../internals/AbstractReadWriteDecorator.java | 7 +
.../internals/CompositeReadOnlyKeyValueStore.java | 21 +
...ValueToTimestampedKeyValueByteStoreAdapter.java | 7 +
.../streams/state/internals/MemoryLRUCache.java | 10 +
.../state/internals/MemoryNavigableLRUCache.java | 15 +
.../state/internals/MeteredKeyValueStore.java | 2 +-
.../internals/ReadOnlyKeyValueStoreFacade.java | 7 +
.../streams/state/internals/RocksDBStore.java | 2 +
.../internals/TimestampedKeyValueStoreBuilder.java | 7 +
.../integration/QueryableStateIntegrationTest.java | 58 ++
.../processor/internals/ProcessorTopologyTest.java | 1000 ++++++++++++++++++++
.../CompositeReadOnlyKeyValueStoreTest.java | 86 ++
.../internals/ReadOnlyKeyValueStoreFacadeTest.java | 16 +
.../org/apache/kafka/test/MockKeyValueStore.java | 6 +
.../org/apache/kafka/test/NoOpReadOnlyStore.java | 6 +
16 files changed, 1256 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index be47e89..3ec8d7f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -116,6 +117,12 @@ abstract class AbstractReadOnlyDecorator<T extends
StateStore, K, V> extends Wra
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return wrapped().prefixScan(prefix, prefixKeySerializer);
+ }
+
+ @Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 60fa8f4..aff099a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -110,6 +111,12 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return wrapped().prefixScan(prefix, prefixKeySerializer);
+ }
+
+ @Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 54e5f1e..4da9e5c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -105,6 +106,26 @@ public class CompositeReadOnlyKeyValueStore<K, V>
implements ReadOnlyKeyValueSto
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+ Objects.requireNonNull(prefix);
+ Objects.requireNonNull(prefixKeySerializer);
+ final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>
nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K,
V>>() {
+ @Override
+ public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K,
V> store) {
+ try {
+ return store.prefixScan(prefix, prefixKeySerializer);
+ } catch (final InvalidStateStoreException e) {
+ throw new InvalidStateStoreException("State store is not
available anymore and may have been migrated to another instance; please
re-discover its location from the state metadata.");
+ }
+ }
+ };
+ final List<ReadOnlyKeyValueStore<K, V>> stores =
storeProvider.stores(storeName, storeType);
+ return new DelegatingPeekingKeyValueIterator<>(
+ storeName,
+ new CompositeKeyValueIterator<>(stores.iterator(),
nextIteratorFunction));
+ }
+
+ @Override
public KeyValueIterator<K, V> all() {
final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>
nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K,
V>>() {
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index 6bb0950..d9b42c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -142,6 +143,12 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter
implements KeyValueSt
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return new
KeyValueToTimestampedKeyValueIteratorAdapter<>(store.prefixScan(prefix,
prefixKeySerializer));
+ }
+
+ @Override
public long approximateNumEntries() {
return store.approximateNumEntries();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 236fedc..22f1215 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -178,6 +179,15 @@ public class MemoryLRUCache implements
KeyValueStore<Bytes, byte[]> {
throw new UnsupportedOperationException("MemoryLRUCache does not
support reverseAll() function.");
}
+ /**
+ * @throws UnsupportedOperationException at every invocation
+ */
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ throw new UnsupportedOperationException("MemoryLRUCache does not
support prefixScan() function.");
+ }
+
@Override
public long approximateNumEntries() {
return this.map.size();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 9ad885d..fd636ec 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -67,6 +68,20 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+ final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null,
prefix));
+ final Bytes to = Bytes.increment(from);
+
+ final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+
+ return new DelegatingPeekingKeyValueIterator<>(
+ name(),
+ new MemoryNavigableLRUCache.CacheIterator(treeMap.subMap(from,
true, to, false).keySet().iterator(), treeMap)
+ );
+ }
+
+ @Override
public KeyValueIterator<Bytes, byte[]> all() {
final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
return new
MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(),
treeMap);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index f26132d..e64cdf4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -259,7 +259,7 @@ public class MeteredKeyValueStore<K, V>
@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix, final PS prefixKeySerializer) {
- Objects.requireNonNull(prefix, "key cannot be null");
+ Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
return new MeteredKeyValueIterator(wrapped().prefixScan(prefix,
prefixKeySerializer), prefixScanSensor);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
index 2ffa3f3..7a03f72 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -47,6 +48,12 @@ public class ReadOnlyKeyValueStoreFacade<K, V> implements
ReadOnlyKeyValueStore<
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return new KeyValueIteratorFacade<>(inner.prefixScan(prefix,
prefixKeySerializer));
+ }
+
+ @Override
public KeyValueIterator<K, V> all() {
return new KeyValueIteratorFacade<>(inner.all());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f501d13..86f2a83 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -308,6 +308,8 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
final PS prefixKeySerializer) {
validateStoreOpen();
+ Objects.requireNonNull(prefix, "prefix cannot be null");
+ Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
final Bytes prefixBytes =
Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator =
dbAccessor.prefixScan(prefixBytes);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index 444d005..a249a14 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
@@ -158,6 +159,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return wrapped.prefixScan(prefix, prefixKeySerializer);
+ }
+
+ @Override
public long approximateNumEntries() {
return wrapped.approximateNumEntries();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index d07648b..af954e64 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -845,6 +845,64 @@ public class QueryableStateIntegrationTest {
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(Long.valueOf(batchEntry.value),
myMapStore.get(batchEntry.key));
}
+
+ final KeyValueIterator<String, Long> range = myMapStore.range("hello",
"kafka");
+ while (range.hasNext()) {
+ System.out.println(range.next());
+ }
+ }
+
+ @Test
+ public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+ final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+ Arrays.asList(
+ new KeyValue<>(keys[0], "1"),
+ new KeyValue<>(keys[1], "1"),
+ new KeyValue<>(keys[2], "3"),
+ new KeyValue<>(keys[3], "5"),
+ new KeyValue<>(keys[4], "2"))
+ );
+
+ final List<KeyValue<String, Long>> expectedPrefixScanResult =
Arrays.asList(
+ new KeyValue<>(keys[3], 5L),
+ new KeyValue<>(keys[1], 1L)
+ );
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ streamOne,
+ batch1,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ mockTime);
+
+ final KTable<String, String> t1 = builder.table(streamOne);
+ t1
+ .mapValues(
+ (ValueMapper<String, Long>) Long::valueOf,
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+ .toStream()
+ .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+ startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+ waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
+
+ final ReadOnlyKeyValueStore<String, Long> myMapStore =
+ IntegrationTestUtils.getStore("queryMapValues", kafkaStreams,
keyValueStore());
+
+ int index = 0;
+ final KeyValueIterator<String, Long> range =
myMapStore.prefixScan("go", Serdes.String().serializer());
+ while (range.hasNext()) {
+ assertEquals(expectedPrefixScanResult.get(index++), range.next());
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index b92a468..a0dc50e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
@@ -42,6 +43,7 @@ import
org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -56,6 +58,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Supplier;
import static java.util.Arrays.asList;
@@ -383,6 +387,1002 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
+ @Test
+ public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging()
{
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void
testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanLruMapNoCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanLruMapWithCachingNoLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Test
+ public void testPrefixScanLruMapWithCachingWithLogging() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStores(() -> new
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void
testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(storeName),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingDisabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingDisabled();
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
+ @Deprecated // testing old PAPI
+ @Test
+ public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
+ final String storeName = "prefixScanStore";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(Stores.lruMap(storeName, 100),
Serdes.String(), Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+ topology
+ .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
+ .addProcessor("processor1", defineWithStoresOldAPI(() -> new
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)),
"source1")
+ .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+ driver = new TopologyTestDriver(topology, props);
+
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+ final TestOutputTopic<Integer, String> outputTopic1 =
+ driver.createOutputTopic(OUTPUT_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+ inputTopic.pipeInput("key2", "value2");
+ inputTopic.pipeInput("key3", "value3");
+ inputTopic.pipeInput("key1", "value4");
+ assertTrue(outputTopic1.isEmpty());
+
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("prefixScanStore");
+ final KeyValueIterator<String, String> prefixScan =
store.prefixScan("key", Serdes.String().serializer());
+ final List<KeyValue<String, String>> results = new ArrayList<>();
+ while (prefixScan.hasNext()) {
+ final KeyValue<String, String> next = prefixScan.next();
+ results.add(next);
+ }
+
+ assertEquals("key1", results.get(0).key);
+ assertEquals("value4", results.get(0).value);
+ assertEquals("key2", results.get(1).key);
+ assertEquals("value2", results.get(1).value);
+ assertEquals("key3", results.get(2).key);
+ assertEquals("value3", results.get(2).value);
+
+ }
+
@Deprecated // testing old PAPI
@Test
public void shouldDriveGlobalStore() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index ca8468b..652694b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -113,6 +114,16 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldThrowNullPointerExceptionOnPrefixScanNullPrefix() {
+ assertThrows(NullPointerException.class, () ->
theStore.prefixScan(null, new StringSerializer()));
+ }
+
+ @Test
+ public void
shouldThrowNullPointerExceptionOnPrefixScanNullPrefixKeySerializer() {
+ assertThrows(NullPointerException.class, () ->
theStore.prefixScan("aa", null));
+ }
+
+ @Test
public void shouldThrowNullPointerExceptionOnReverseRangeNullFromKey() {
assertThrows(NullPointerException.class, () ->
theStore.reverseRange(null, "to"));
}
@@ -151,6 +162,22 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldThrowNoSuchElementExceptionWhileNextForPrefixScan() {
+ stubOneUnderlying.put("a", "1");
+ final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class, keyValueIterator::next);
+ }
+
+ @Test
+ public void shouldThrowNoSuchElementExceptionWhilePeekNextForPrefixScan() {
+ stubOneUnderlying.put("a", "1");
+ final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
+ keyValueIterator.next();
+ assertThrows(NoSuchElementException.class,
keyValueIterator::peekNextKey);
+ }
+
+ @Test
public void shouldThrowUnsupportedOperationExceptionWhileRemove() {
final KeyValueIterator<String, String> keyValueIterator =
theStore.all();
assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
@@ -173,6 +200,14 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldThrowUnsupportedOperationExceptionWhilePrefixScan() {
+ stubOneUnderlying.put("a", "1");
+ stubOneUnderlying.put("b", "1");
+ final KeyValueIterator<String, String> keyValueIterator =
theStore.prefixScan("a", new StringSerializer());
+ assertThrows(UnsupportedOperationException.class,
keyValueIterator::remove);
+ }
+
+ @Test
public void shouldFindValueForKeyWhenMultiStores() {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
@@ -212,6 +247,29 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
+ stubOneUnderlying.put("abc", "a");
+ stubOneUnderlying.put("abcd", "b");
+ stubOneUnderlying.put("abce", "c");
+
+ final List<KeyValue<String, String>> results =
toList(theStore.prefixScan("abcd", new StringSerializer()));
+ assertTrue(results.contains(new KeyValue<>("abcd", "b")));
+ assertEquals(1, results.size());
+ }
+
+ @Test
+ public void shouldSupportPrefixScan() {
+ stubOneUnderlying.put("a", "a");
+ stubOneUnderlying.put("aa", "b");
+ stubOneUnderlying.put("b", "c");
+
+ final List<KeyValue<String, String>> results =
toList(theStore.prefixScan("a", new StringSerializer()));
+ assertTrue(results.contains(new KeyValue<>("a", "a")));
+ assertTrue(results.contains(new KeyValue<>("aa", "b")));
+ assertEquals(2, results.size());
+ }
+
+ @Test
public void shouldSupportRangeAcrossMultipleKVStores() {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
@@ -236,6 +294,29 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldSupportPrefixScanAcrossMultipleKVStores() {
+ final KeyValueStore<String, String> cache = newStoreInstance();
+ stubProviderTwo.addStore(storeName, cache);
+
+ stubOneUnderlying.put("a", "a");
+ stubOneUnderlying.put("b", "b");
+ stubOneUnderlying.put("z", "z");
+
+ cache.put("aa", "c");
+ cache.put("ab", "d");
+ cache.put("x", "x");
+
+ final List<KeyValue<String, String>> results =
toList(theStore.prefixScan("a", new StringSerializer()));
+ assertArrayEquals(
+ asList(
+ new KeyValue<>("a", "a"),
+ new KeyValue<>("aa", "c"),
+ new KeyValue<>("ab", "d")
+ ).toArray(),
+ results.toArray());
+ }
+
+ @Test
public void shouldSupportReverseRangeAcrossMultipleKVStores() {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
@@ -323,6 +404,11 @@ public class CompositeReadOnlyKeyValueStoreTest {
}
@Test
+ public void shouldThrowInvalidStoreExceptionOnPrefixScanDuringRebalance() {
+ assertThrows(InvalidStateStoreException.class, () ->
rebalancing().prefixScan("anything", new StringSerializer()));
+ }
+
+ @Test
public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() {
assertThrows(InvalidStateStoreException.class, () ->
rebalancing().all());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
index 4b0ca79..ffb5ab3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -75,6 +76,21 @@ public class ReadOnlyKeyValueStoreFacadeTest {
}
@Test
+ public void shouldReturnPlainKeyValuePairsForPrefixScan() {
+ final StringSerializer stringSerializer = new StringSerializer();
+ expect(mockedKeyValueTimestampIterator.next())
+ .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+ expect(mockedKeyValueTimestampStore.prefixScan("key",
stringSerializer)).andReturn(mockedKeyValueTimestampIterator);
+ replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+ final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ }
+
+ @Test
public void shouldReturnPlainKeyValuePairsForAllIterator() {
expect(mockedKeyValueTimestampIterator.next())
.andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index d9e4acf..91f8a6f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -126,6 +127,11 @@ public class MockKeyValueStore implements
KeyValueStore<Object, Object> {
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Object, Object>
prefixScan(P prefix, PS prefixKeySerializer) {
+ return null;
+ }
+
+ @Override
public KeyValueIterator<Object, Object> all() {
return null;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index c6b5eee..7234231 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -55,6 +56,11 @@ public class NoOpReadOnlyStore<K, V> implements
ReadOnlyKeyValueStore<K, V>, Sta
}
@Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P
prefix, PS prefixKeySerializer) {
+ return null;
+ }
+
+ @Override
public KeyValueIterator<K, V> all() {
return null;
}