This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21b7963 KAFKA-7918: Inline generic parameters Pt. I: in-memory
key-value store (#6293)
21b7963 is described below
commit 21b79635474209fcd67d3bf33a70c25f6827c6ef
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Mon Feb 25 19:59:14 2019 -0800
KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store
(#6293)
First PR in series to inline the generic parameters of the following bytes
stores:
[x] InMemoryKeyValueStore
[ ] RocksDBWindowStore
[ ] RocksDBSessionStore
[ ] MemoryLRUCache
[ ] MemoryNavigableLRUCache
[ ] (awaiting merge) InMemoryWindowStore
A number of tests took advantage of the generic InMemoryKeyValueStore and
had to be reworked somewhat -- this PR covers everything related to the
in-memory key-value store.
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>, John Roesler <[email protected]>, Bill Bejeck
<[email protected]>
---
.../org/apache/kafka/streams/state/Stores.java | 2 +-
.../DelegatingPeekingKeyValueIterator.java | 2 +-
.../state/internals/InMemoryKeyValueStore.java | 71 ++++++++-------------
.../kstream/internals/KTableReduceTest.java | 7 +--
.../internals/KeyValueStoreMaterializerTest.java | 3 +-
.../state/internals/CachingKeyValueStoreTest.java | 4 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 2 +-
.../DelegatingPeekingKeyValueIteratorTest.java | 7 ++-
.../state/internals/FilteredCacheIteratorTest.java | 4 +-
...dSortedCacheKeyValueBytesStoreIteratorTest.java | 4 +-
.../kafka/test/GenericInMemoryKeyValueStore.java} | 73 ++++++++--------------
.../kafka/streams/MockProcessorContextTest.java | 15 ++++-
12 files changed, 82 insertions(+), 112 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index d8b19fd..46a9d45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -104,7 +104,7 @@ public class Stores {
@Override
public KeyValueStore<Bytes, byte[]> get() {
- return new InMemoryKeyValueStore<>(name, Serdes.Bytes(),
Serdes.ByteArray());
+ return new InMemoryKeyValueStore(name);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index 673a7c9..20a434a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
/**
* Optimized {@link KeyValueIterator} used when the same element could be
peeked multiple times.
*/
-class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K,
V>, PeekingKeyValueIterator<K, V> {
+public class DelegatingPeekingKeyValueIterator<K, V> implements
KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
private final KeyValueIterator<K, V> underlying;
private final String storeName;
private KeyValue<K, V> next;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index d6dd42a..cc28d64 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -16,39 +16,27 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serde;
+import java.util.List;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
-public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
- private final NavigableMap<K, V> map;
+ private final NavigableMap<Bytes, byte[]> map;
private volatile boolean open = false;
- private StateSerdes<K, V> serdes;
-
- public InMemoryKeyValueStore(final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ public InMemoryKeyValueStore(final String name) {
this.name = name;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- // TODO: when we have serde associated with class types, we can
- // improve this situation by passing the comparator here.
this.map = new TreeMap<>();
}
@@ -61,20 +49,15 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context,
final StateStore root) {
- // construct the serde
- this.serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
if (root != null) {
// register the store
context.register(root, (key, value) -> {
// this is a delete
if (value == null) {
- delete(serdes.keyFrom(key));
+ delete(Bytes.wrap(key));
} else {
- put(serdes.keyFrom(key), serdes.valueFrom(value));
+ put(Bytes.wrap(key), value);
}
});
}
@@ -93,13 +76,12 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized V get(final K key) {
+ public synchronized byte[] get(final Bytes key) {
return this.map.get(key);
}
@Override
- public synchronized void put(final K key,
- final V value) {
+ public synchronized void put(final Bytes key, final byte[] value) {
if (value == null) {
this.map.remove(key);
} else {
@@ -108,9 +90,8 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized V putIfAbsent(final K key,
- final V value) {
- final V originalValue = get(key);
+ public synchronized byte[] putIfAbsent(final Bytes key, final byte[]
value) {
+ final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
}
@@ -118,29 +99,29 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized void putAll(final List<KeyValue<K, V>> entries) {
- for (final KeyValue<K, V> entry : entries) {
+ public synchronized void putAll(final List<KeyValue<Bytes, byte[]>>
entries) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
put(entry.key, entry.value);
}
}
@Override
- public synchronized V delete(final K key) {
+ public synchronized byte[] delete(final Bytes key) {
return this.map.remove(key);
}
@Override
- public synchronized KeyValueIterator<K, V> range(final K from,
- final K to) {
+ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
return new DelegatingPeekingKeyValueIterator<>(
name,
- new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to,
true).entrySet().iterator()));
+ new InMemoryKeyValueIterator(this.map.subMap(from, true, to,
true).entrySet().iterator()));
}
@Override
- public synchronized KeyValueIterator<K, V> all() {
- final TreeMap<K, V> copy = new TreeMap<>(this.map);
- return new DelegatingPeekingKeyValueIterator<>(name, new
InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+ public synchronized KeyValueIterator<Bytes, byte[]> all() {
+ final TreeMap<Bytes, byte[]> copy = new TreeMap<>(this.map);
+ return new DelegatingPeekingKeyValueIterator<>(name, new
InMemoryKeyValueIterator(copy.entrySet().iterator()));
}
@Override
@@ -159,10 +140,10 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
this.open = false;
}
- private static class InMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
+ private static class InMemoryKeyValueIterator implements
KeyValueIterator<Bytes, byte[]> {
+ private final Iterator<Map.Entry<Bytes, byte[]>> iter;
- private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter)
{
+ private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes,
byte[]>> iter) {
this.iter = iter;
}
@@ -172,8 +153,8 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public KeyValue<K, V> next() {
- final Map.Entry<K, V> entry = iter.next();
+ public KeyValue<Bytes, byte[]> next() {
+ final Map.Entry<Bytes, byte[]> entry = iter.next();
return new KeyValue<>(entry.getKey(), entry.getValue());
}
@@ -188,7 +169,7 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public K peekNextKey() {
+ public Bytes peekNextKey() {
throw new UnsupportedOperationException("peekNextKey() not
supported in " + getClass().getName());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 05b74dc..afb2cc1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.Test;
@@ -44,9 +45,7 @@ public class KTableReduceTest {
this::differenceNotNullArgs
).get();
-
- final InMemoryKeyValueStore<String, Set<String>> myStore =
- new InMemoryKeyValueStore<>("myStore", null, null);
+ final KeyValueStore<String, Set<String>> myStore = new
GenericInMemoryKeyValueStore<>("myStore");
context.register(myStore, null);
reduceProcessor.init(context);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index bb1dec2..30080c3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
@@ -107,7 +106,7 @@ public class KeyValueStoreMaterializerTest {
@Test
public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
final KeyValueBytesStoreSupplier supplier =
EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
- final InMemoryKeyValueStore<Bytes, byte[]> store = new
InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
+ final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name");
EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
EasyMock.expect(supplier.get()).andReturn(store);
EasyMock.replay(supplier);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index b1a6401..6c2b7cf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -58,7 +58,7 @@ public class CachingKeyValueStoreTest extends
AbstractKeyValueStoreTest {
private final int maxCacheSizeBytes = 150;
private InternalMockProcessorContext context;
private CachingKeyValueStore<String, String> store;
- private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
+ private InMemoryKeyValueStore underlyingStore;
private ThreadCache cache;
private CacheFlushListenerStub<String, String> cacheFlushListener;
private String topic;
@@ -66,7 +66,7 @@ public class CachingKeyValueStoreTest extends
AbstractKeyValueStoreTest {
@Before
public void setUp() {
final String storeName = "store";
- underlyingStore = new InMemoryKeyValueStore<>(storeName,
Serdes.Bytes(), Serdes.ByteArray());
+ underlyingStore = new InMemoryKeyValueStore(storeName);
cacheFlushListener = new CacheFlushListenerStub<>();
store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(),
Serdes.String());
store.setFlushListener(cacheFlushListener, false);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5fdfd46..5645b8b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -43,7 +43,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class ChangeLoggingKeyValueBytesStoreTest {
private InternalMockProcessorContext context;
- private final InMemoryKeyValueStore<Bytes, byte[]> inner = new
InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
+ private final InMemoryKeyValueStore inner = new
InMemoryKeyValueStore("kv");
private final ChangeLoggingKeyValueBytesStore store = new
ChangeLoggingKeyValueBytesStore(inner);
private final Map<Object, Object> sent = new HashMap<>();
private final Bytes hi = Bytes.wrap("hi".getBytes());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index 8b6fc95..593b265 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -16,8 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Before;
import org.junit.Test;
@@ -29,11 +30,11 @@ import static org.junit.Assert.assertTrue;
public class DelegatingPeekingKeyValueIteratorTest {
private final String name = "name";
- private InMemoryKeyValueStore<String, String> store;
+ private KeyValueStore<String, String> store;
@Before
public void setUp() {
- store = new InMemoryKeyValueStore<>(name, Serdes.String(),
Serdes.String());
+ store = new GenericInMemoryKeyValueStore<>(name);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
index 4a0796d..bf54786 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +49,7 @@ public class FilteredCacheIteratorTest {
};
@SuppressWarnings("unchecked")
- private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new
InMemoryKeyValueStore("name", null, null);
+ private final KeyValueStore<Bytes, LRUCacheEntry> store = new
GenericInMemoryKeyValueStore<>("my-store");
private final KeyValue<Bytes, LRUCacheEntry> firstEntry =
KeyValue.pair(Bytes.wrap("a".getBytes()),
new LRUCacheEntry("1".getBytes()));
private final List<KeyValue<Bytes, LRUCacheEntry>> entries = asList(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index d7f164c..4028b0c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -39,7 +39,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
@Before
public void setUp() throws Exception {
- store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(),
Serdes.ByteArray());
+ store = new InMemoryKeyValueStore(namespace);
cache = new ThreadCache(new LogContext("testCache "), 10000L, new
MockStreamsMetrics(new Metrics()));
}
@@ -146,7 +146,7 @@ public class
MergedSortedCacheKeyValueBytesStoreIteratorTest {
@Test
public void shouldPeekNextKey() throws Exception {
- final KeyValueStore<Bytes, byte[]> kv = new
InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
+ final KeyValueStore<Bytes, byte[]> kv = new
InMemoryKeyValueStore("one");
final ThreadCache cache = new ThreadCache(new LogContext("testCache
"), 1000000L, new MockStreamsMetrics(new Metrics()));
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8},
{9}, {10}};
for (int i = 0; i < bytes.length - 1; i += 2) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
similarity index 64%
copy from
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
copy to
streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index d6dd42a..e9d20f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -14,41 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.test;
-import org.apache.kafka.common.serialization.Serde;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
+import
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+/**
+ * This class is a generic version of the in-memory key-value store that is
useful for testing when you
+ * need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
+ */
+public class GenericInMemoryKeyValueStore<K extends Comparable, V> implements
KeyValueStore<K, V> {
-public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
private final String name;
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
private final NavigableMap<K, V> map;
private volatile boolean open = false;
- private StateSerdes<K, V> serdes;
-
- public InMemoryKeyValueStore(final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ public GenericInMemoryKeyValueStore(final String name) {
this.name = name;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- // TODO: when we have serde associated with class types, we can
- // improve this situation by passing the comparator here.
this.map = new TreeMap<>();
}
@@ -59,24 +52,10 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext context,
- final StateStore root) {
- // construct the serde
- this.serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
+ /* This is a "dummy" store used for testing and does not support restoring
from changelog since we allow it to be serde-ignorant */
+ public void init(final ProcessorContext context, final StateStore root) {
if (root != null) {
- // register the store
- context.register(root, (key, value) -> {
- // this is a delete
- if (value == null) {
- delete(serdes.keyFrom(key));
- } else {
- put(serdes.keyFrom(key), serdes.valueFrom(value));
- }
- });
+ context.register(root, null);
}
this.open = true;
@@ -99,7 +78,7 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
@Override
public synchronized void put(final K key,
- final V value) {
+ final V value) {
if (value == null) {
this.map.remove(key);
} else {
@@ -109,7 +88,7 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
@Override
public synchronized V putIfAbsent(final K key,
- final V value) {
+ final V value) {
final V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@@ -131,16 +110,16 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
@Override
public synchronized KeyValueIterator<K, V> range(final K from,
- final K to) {
+ final K to) {
return new DelegatingPeekingKeyValueIterator<>(
name,
- new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to,
true).entrySet().iterator()));
+ new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true,
to, true).entrySet().iterator()));
}
@Override
public synchronized KeyValueIterator<K, V> all() {
final TreeMap<K, V> copy = new TreeMap<>(this.map);
- return new DelegatingPeekingKeyValueIterator<>(name, new
InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+ return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
}
@Override
@@ -159,10 +138,10 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
this.open = false;
}
- private static class InMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
+ private static class GenericInMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, V> {
+ private final Iterator<Entry<K, V>> iter;
- private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter)
{
+ private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K,
V>> iter) {
this.iter = iter;
}
@@ -192,4 +171,4 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
throw new UnsupportedOperationException("peekNextKey() not
supported in " + getClass().getName());
}
}
-}
+}
\ No newline at end of file
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 41b62f9..32c479c 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import java.io.File;
@@ -230,6 +232,7 @@ public class MockProcessorContextTest {
assertFalse(context.committed());
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldStoreAndReturnStateStores() {
final AbstractProcessor<String, Long> processor = new
AbstractProcessor<String, Long>() {
@@ -243,10 +246,16 @@ public class MockProcessorContextTest {
};
final MockProcessorContext context = new MockProcessorContext();
- final KeyValueStore<String, Long> store = new
InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long());
- context.register(store, null);
+
+ final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("my-state"),
+ Serdes.String(),
+ Serdes.Long()).withLoggingDisabled();
+
+ final KeyValueStore<String, Long> store = (KeyValueStore<String,
Long>) storeBuilder.build();
store.init(context, store);
+
processor.init(context);
processor.process("foo", 5L);