This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8f2307  KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes 
Store and Memory LRU Caches (#6327)
a8f2307 is described below

commit a8f2307164ce0f1f47c458eee8f54173f7218a16
Author: A. Sophie Blee-Goldman <ableegold...@gmail.com>
AuthorDate: Wed Feb 27 07:08:08 2019 -0800

    KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and 
Memory LRU Caches (#6327)
    
    Second PR in series to inline the generic parameters of the following bytes 
stores
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../org/apache/kafka/streams/state/Stores.java     |  3 +-
 .../internals/ChangeLoggingKeyValueBytesStore.java |  2 +-
 .../streams/state/internals/MemoryLRUCache.java    | 76 +++++++---------------
 .../state/internals/MemoryNavigableLRUCache.java   | 39 +++++------
 .../state/internals/RocksDBSessionStore.java       | 68 +++++++------------
 .../state/internals/RocksDBWindowStore.java        | 53 ++++++---------
 .../RocksDbSessionBytesStoreSupplier.java          |  3 +-
 .../internals/RocksDbWindowBytesStoreSupplier.java | 11 ++--
 .../internals/WindowStoreIteratorWrapper.java      | 47 ++++---------
 .../internals/WrappedSessionStoreIterator.java     | 17 ++---
 .../state/internals/CachingSessionStoreTest.java   |  2 +-
 .../state/internals/CachingWindowStoreTest.java    |  4 +-
 .../state/internals/RocksDBSessionStoreTest.java   | 35 ++++------
 .../state/internals/RocksDBWindowStoreTest.java    | 18 -----
 14 files changed, 126 insertions(+), 252 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 46a9d45..113e531 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.internals.ApiUtils;
@@ -134,7 +133,7 @@ public class Stores {
 
             @Override
             public KeyValueStore<Bytes, byte[]> get() {
-                return new MemoryNavigableLRUCache<>(name, maxCacheSize, 
Serdes.Bytes(), Serdes.ByteArray());
+                return new MemoryNavigableLRUCache(name, maxCacheSize);
             }
 
             @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 7567e78..aa931bf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -44,7 +44,7 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore<KeyValueS
 
         // if the inner store is an LRU cache, add the eviction listener to 
log removed record
         if (wrapped() instanceof MemoryLRUCache) {
-            ((MemoryLRUCache<Bytes, byte[]>) 
wrapped()).setWhenEldestRemoved((key, value) -> {
+            ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
                 // pass null to indicate removal
                 changeLogger.logChange(key, null);
             });
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index f0c3c8c..d69df13 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -32,46 +30,31 @@ import java.util.Objects;
 
 /**
  * An in-memory LRU cache store based on HashSet and HashMap.
- *
- *  * Note that the use of array-typed keys is discouraged because they result 
in incorrect ordering behavior.
- * If you intend to work on byte arrays as key, for example, you may want to 
wrap them with the {@code Bytes} class,
- * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code 
RocksDBStore<byte[], ...>}.
- *
- * @param <K> The key type
- * @param <V> The value type
  */
-public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
 
-    public interface EldestEntryRemovalListener<K, V> {
-        void apply(K key, V value);
+    public interface EldestEntryRemovalListener {
+        void apply(Bytes key, byte[] value);
     }
 
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
     private final String name;
-    protected final Map<K, V> map;
+    protected final Map<Bytes, byte[]> map;
 
-    private StateSerdes<K, V> serdes;
     private boolean restoring = false; // TODO: this is a sub-optimal solution 
to avoid logging during restoration.
                                        // in the future we should augment the 
StateRestoreCallback with onComplete etc to better resolve this.
     private volatile boolean open = true;
 
-    private EldestEntryRemovalListener<K, V> listener;
+    private EldestEntryRemovalListener listener;
 
-    MemoryLRUCache(final String name,
-                   final int maxCacheSize,
-                   final Serde<K> keySerde,
-                   final Serde<V> valueSerde) {
+    MemoryLRUCache(final String name, final int maxCacheSize) {
         this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
 
         // leave room for one extra entry to handle adding an entry before the 
oldest can be removed
-        this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+        this.map = new LinkedHashMap<Bytes, byte[]>(maxCacheSize + 1, 1.01f, 
true) {
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
+            protected boolean removeEldestEntry(final Map.Entry<Bytes, byte[]> 
eldest) {
                 final boolean evict = super.size() > maxCacheSize;
                 if (evict && !restoring && listener != null) {
                     listener.apply(eldest.getKey(), eldest.getValue());
@@ -81,7 +64,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, 
V> {
         };
     }
 
-    void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) 
{
+    void setWhenEldestRemoved(final EldestEntryRemovalListener listener) {
         this.listener = listener;
     }
 
@@ -91,24 +74,12 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context,
-                     final StateStore root) {
-        // construct the serde
-        this.serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+    public void init(final ProcessorContext context, final StateStore root) {
 
         // register the store
         context.register(root, (key, value) -> {
             restoring = true;
-            // check value for null, to avoid  deserialization error.
-            if (value == null) {
-                delete(serdes.keyFrom(key));
-            } else {
-                put(serdes.keyFrom(key), serdes.valueFrom(value));
-            }
+            put(Bytes.wrap(key), value);
             restoring = false;
         });
     }
@@ -124,28 +95,26 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized V get(final K key) {
+    public synchronized byte[] get(final Bytes key) {
         Objects.requireNonNull(key);
 
         return this.map.get(key);
     }
 
     @Override
-    public synchronized void put(final K key,
-                                 final V value) {
+    public synchronized void put(final Bytes key, final byte[] value) {
         Objects.requireNonNull(key);
         if (value == null) {
-            this.map.remove(key);
+            delete(key);
         } else {
             this.map.put(key, value);
         }
     }
 
     @Override
-    public synchronized V putIfAbsent(final K key,
-                                      final V value) {
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
         Objects.requireNonNull(key);
-        final V originalValue = get(key);
+        final byte[] originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
         }
@@ -153,14 +122,14 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        for (final KeyValue<K, V> entry : entries) {
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
             put(entry.key, entry.value);
         }
     }
 
     @Override
-    public synchronized V delete(final K key) {
+    public synchronized byte[] delete(final Bytes key) {
         Objects.requireNonNull(key);
         return this.map.remove(key);
     }
@@ -169,8 +138,7 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
      * @throws UnsupportedOperationException at every invocation
      */
     @Override
-    public KeyValueIterator<K, V> range(final K from,
-                                        final K to) {
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes 
to) {
         throw new UnsupportedOperationException("MemoryLRUCache does not 
support range() function.");
     }
 
@@ -178,7 +146,7 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
      * @throws UnsupportedOperationException at every invocation
      */
     @Override
-    public KeyValueIterator<K, V> all() {
+    public KeyValueIterator<Bytes, byte[]> all() {
         throw new UnsupportedOperationException("MemoryLRUCache does not 
support all() function.");
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index d7b7b11..c3cc834 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -24,36 +24,37 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
+public class MemoryNavigableLRUCache extends MemoryLRUCache {
 
-
-    public MemoryNavigableLRUCache(final String name, final int maxCacheSize, 
final Serde<K> keySerde, final Serde<V> valueSerde) {
-        super(name, maxCacheSize, keySerde, valueSerde);
+    public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
+        super(name, maxCacheSize);
     }
 
     @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        final TreeMap<K, V> treeMap = toTreeMap();
-        return new DelegatingPeekingKeyValueIterator<>(name(), new 
MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, 
true, to, true).iterator(), treeMap));
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes 
to) {
+        final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+        return new DelegatingPeekingKeyValueIterator<>(name(),
+            new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet()
+                .subSet(from, true, to, true).iterator(), treeMap));
     }
 
     @Override
-    public  KeyValueIterator<K, V> all() {
-        final TreeMap<K, V> treeMap = toTreeMap();
-        return new 
MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().iterator(), 
treeMap);
+    public  KeyValueIterator<Bytes, byte[]> all() {
+        final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
+        return new 
MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(), 
treeMap);
     }
 
-    private synchronized TreeMap<K, V> toTreeMap() {
+    private synchronized TreeMap<Bytes, byte[]> toTreeMap() {
         return new TreeMap<>(this.map);
     }
 
 
-    private static class CacheIterator<K, V> implements KeyValueIterator<K, V> 
{
-        private final Iterator<K> keys;
-        private final Map<K, V> entries;
-        private K lastKey;
+    private static class CacheIterator implements KeyValueIterator<Bytes, 
byte[]> {
+        private final Iterator<Bytes> keys;
+        private final Map<Bytes, byte[]> entries;
+        private Bytes lastKey;
 
-        public CacheIterator(final Iterator<K> keys, final Map<K, V> entries) {
+        private CacheIterator(final Iterator<Bytes> keys, final Map<Bytes, 
byte[]> entries) {
             this.keys = keys;
             this.entries = entries;
         }
@@ -64,7 +65,7 @@ public class MemoryNavigableLRUCache<K, V> extends 
MemoryLRUCache<K, V> {
         }
 
         @Override
-        public KeyValue<K, V> next() {
+        public KeyValue<Bytes, byte[]> next() {
             lastKey = keys.next();
             return new KeyValue<>(lastKey, entries.get(lastKey));
         }
@@ -80,7 +81,7 @@ public class MemoryNavigableLRUCache<K, V> extends 
MemoryLRUCache<K, V> {
         }
 
         @Override
-        public K peekNextKey() {
+        public Bytes peekNextKey() {
             throw new UnsupportedOperationException("peekNextKey not 
supported");
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index d855442..c9ca423 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -16,90 +16,66 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 
-public class RocksDBSessionStore<K, AGG> extends 
WrappedStateStore<SegmentedBytesStore> implements SessionStore<K, AGG> {
+public class RocksDBSessionStore extends 
WrappedStateStore<SegmentedBytesStore> implements SessionStore<Bytes, byte[]> {
 
-    private final Serde<K> keySerde;
-    private final Serde<AGG> aggSerde;
-
-    private StateSerdes<K, AGG> serdes;
-    private String topic;
-
-    RocksDBSessionStore(final SegmentedBytesStore bytesStore,
-                        final Serde<K> keySerde,
-                        final Serde<AGG> aggSerde) {
+    RocksDBSessionStore(final SegmentedBytesStore bytesStore) {
         super(bytesStore);
-        this.keySerde = keySerde;
-        this.aggSerde = aggSerde;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context, final StateStore root) {
-        final String storeName = name();
-        topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
-
-        serdes = new StateSerdes<>(
-            topic,
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
-
-        super.init(context, root);
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final 
long earliestSessionEndTime, final long latestSessionStartTime) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
-            Bytes.wrap(serdes.rawKey(key)),
+            key,
             earliestSessionEndTime,
             latestSessionStartTime
         );
-        return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
+        return new WrappedSessionStoreIterator(bytesIterator);
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, 
final K keyTo, final long earliestSessionEndTime, final long 
latestSessionStartTime) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom,
+                                                                  final Bytes 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(
-            Bytes.wrap(serdes.rawKey(keyFrom)),
-            Bytes.wrap(serdes.rawKey(keyTo)),
+            keyFrom,
+            keyTo,
             earliestSessionEndTime,
             latestSessionStartTime
         );
-        return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
+        return new WrappedSessionStoreIterator(bytesIterator);
     }
 
     @Override
-    public AGG fetchSession(final K key, final long startTime, final long 
endTime) {
-        return 
serdes.valueFrom(wrapped().get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)),
 startTime, endTime)));
+    public byte[] fetchSession(final Bytes key, final long startTime, final 
long endTime) {
+        return wrapped().get(SessionKeySchema.toBinary(key, startTime, 
endTime));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
         return findSessions(key, 0, Long.MAX_VALUE);
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, 
final Bytes to) {
         return findSessions(from, to, 0, Long.MAX_VALUE);
     }
 
     @Override
-    public void remove(final Windowed<K> key) {
-        wrapped().remove(Bytes.wrap(SessionKeySchema.toBinary(key, 
serdes.keySerializer(), topic)));
+    public void remove(final Windowed<Bytes> key) {
+        wrapped().remove(SessionKeySchema.toBinary(key));
     }
 
     @Override
-    public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        wrapped().put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, 
serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
+    public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        wrapped().put(SessionKeySchema.toBinary(sessionKey), aggregate);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index c22ca52..44c9f79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,98 +16,85 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-public class RocksDBWindowStore<K, V> extends 
WrappedStateStore<SegmentedBytesStore> implements WindowStore<K, V> {
+public class RocksDBWindowStore extends WrappedStateStore<SegmentedBytesStore> 
implements WindowStore<Bytes, byte[]> {
 
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
     private final boolean retainDuplicates;
     private final long windowSize;
 
     private ProcessorContext context;
-    private StateSerdes<K, V> serdes;
     private int seqnum = 0;
 
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
-                       final Serde<K> keySerde,
-                       final Serde<V> valueSerde,
                        final boolean retainDuplicates,
                        final long windowSize) {
         super(bytesStore);
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
         this.retainDuplicates = retainDuplicates;
         this.windowSize = windowSize;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
-        // construct the serde
-        serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 name()),
-                                   keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
-                                   valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
-
         super.init(context, root);
     }
 
     @Override
-    public void put(final K key, final V value) {
+    public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public void put(final K key, final V value, final long 
windowStartTimestamp) {
+    public void put(final Bytes key, final byte[] value, final long 
windowStartTimestamp) {
         maybeUpdateSeqnumForDups();
 
-        wrapped().put(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
+        wrapped().put(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, seqnum), value);
     }
 
     @Override
-    public V fetch(final K key, final long timestamp) {
-        final byte[] bytesValue = 
wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes));
+    public byte[] fetch(final Bytes key, final long timestamp) {
+        final byte[] bytesValue = 
wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum));
         if (bytesValue == null) {
             return null;
         }
-        return serdes.valueFrom(bytesValue);
+        return bytesValue;
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
-        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).valuesIterator();
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(key, timeFrom, timeTo);
+        return new WindowStoreIteratorWrapper(bytesIterator, 
windowSize).valuesIterator();
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), 
timeFrom, timeTo);
-        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetch(from, to, timeFrom, timeTo);
+        return new WindowStoreIteratorWrapper(bytesIterator, 
windowSize).keyValueIterator();
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> all() {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().all();
-        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
+        return new WindowStoreIteratorWrapper(bytesIterator, 
windowSize).keyValueIterator();
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, 
final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchAll(timeFrom, timeTo);
-        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
+        return new WindowStoreIteratorWrapper(bytesIterator, 
windowSize).keyValueIterator();
     }
 
     private void maybeUpdateSeqnumForDups() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index e88755b..8f305db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
@@ -44,7 +43,7 @@ public class RocksDbSessionBytesStoreSupplier implements 
SessionBytesStoreSuppli
             retentionPeriod,
             segmentIntervalMs(),
             new SessionKeySchema());
-        return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), 
Serdes.ByteArray());
+        return new RocksDBSessionStore(segmented);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index b9b7279..ecdfad2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -54,12 +53,10 @@ public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier
                 segmentInterval,
                 new WindowKeySchema()
         );
-        return new RocksDBWindowStore<>(segmentedBytesStore,
-                Serdes.Bytes(),
-                Serdes.ByteArray(),
-                retainDuplicates,
-                windowSize);
-
+        return new RocksDBWindowStore(
+            segmentedBytesStore,
+            retainDuplicates,
+            windowSize);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
index 1feab8f..4095445 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -20,39 +20,33 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-class WindowStoreIteratorWrapper<K, V> {
+class WindowStoreIteratorWrapper {
 
     private final KeyValueIterator<Bytes, byte[]> bytesIterator;
-    private final StateSerdes<K, V> serdes;
     private final long windowSize;
 
     WindowStoreIteratorWrapper(final KeyValueIterator<Bytes, byte[]> 
bytesIterator,
-                               final StateSerdes<K, V> serdes,
                                final long windowSize) {
         this.bytesIterator = bytesIterator;
-        this.serdes = serdes;
         this.windowSize = windowSize;
     }
 
-    public WindowStoreIterator<V> valuesIterator() {
-        return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
+    public WindowStoreIterator<byte[]> valuesIterator() {
+        return new WrappedWindowStoreIterator(bytesIterator);
     }
 
-    public KeyValueIterator<Windowed<K>, V> keyValueIterator() {
-        return new WrappedKeyValueIterator<>(bytesIterator, serdes, 
windowSize);
+    public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() {
+        return new WrappedKeyValueIterator(bytesIterator, windowSize);
     }
 
-    private static class WrappedWindowStoreIterator<V> implements 
WindowStoreIterator<V> {
+    private static class WrappedWindowStoreIterator implements 
WindowStoreIterator<byte[]> {
         final KeyValueIterator<Bytes, byte[]> bytesIterator;
-        final StateSerdes<?, V> serdes;
 
         WrappedWindowStoreIterator(
-            final KeyValueIterator<Bytes, byte[]> bytesIterator, final 
StateSerdes<?, V> serdes) {
+            final KeyValueIterator<Bytes, byte[]> bytesIterator) {
             this.bytesIterator = bytesIterator;
-            this.serdes = serdes;
         }
 
         @Override
@@ -66,11 +60,10 @@ class WindowStoreIteratorWrapper<K, V> {
         }
 
         @Override
-        public KeyValue<Long, V> next() {
+        public KeyValue<Long, byte[]> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
             final long timestamp = 
WindowKeySchema.extractStoreTimestamp(next.key.get());
-            final V value = serdes.valueFrom(next.value);
-            return KeyValue.pair(timestamp, value);
+            return KeyValue.pair(timestamp, next.value);
         }
 
         @Override
@@ -84,25 +77,20 @@ class WindowStoreIteratorWrapper<K, V> {
         }
     }
 
-    private static class WrappedKeyValueIterator<K, V> implements 
KeyValueIterator<Windowed<K>, V> {
+    private static class WrappedKeyValueIterator implements 
KeyValueIterator<Windowed<Bytes>, byte[]> {
         final KeyValueIterator<Bytes, byte[]> bytesIterator;
-        final StateSerdes<K, V> serdes;
         final long windowSize;
 
         WrappedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> 
bytesIterator,
-                                final StateSerdes<K, V> serdes,
                                 final long windowSize) {
             this.bytesIterator = bytesIterator;
-            this.serdes = serdes;
             this.windowSize = windowSize;
         }
 
         @Override
-        public Windowed<K> peekNextKey() {
+        public Windowed<Bytes> peekNextKey() {
             final byte[] nextKey = bytesIterator.peekNextKey().get();
-            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(nextKey);
-            final K key = WindowKeySchema.extractStoreKey(nextKey, serdes);
-            return new Windowed<>(key, 
WindowKeySchema.timeWindowForSize(timestamp, windowSize));
+            return WindowKeySchema.fromStoreBytesKey(nextKey, windowSize);
         }
 
         @Override
@@ -111,16 +99,9 @@ class WindowStoreIteratorWrapper<K, V> {
         }
 
         @Override
-        public KeyValue<Windowed<K>, V> next() {
+        public KeyValue<Windowed<Bytes>, byte[]> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(next.key.get());
-            final K key = WindowKeySchema.extractStoreKey(next.key.get(), 
serdes);
-            final V value = serdes.valueFrom(next.value);
-            return KeyValue.pair(
-                new Windowed<>(key, 
WindowKeySchema.timeWindowForSize(timestamp, windowSize)),
-                value
-            );
-
+            return 
KeyValue.pair(WindowKeySchema.fromStoreBytesKey(next.key.get(), windowSize), 
next.value);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index ce27457..281297c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -20,17 +20,13 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 
-class WrappedSessionStoreIterator<K, V> implements 
KeyValueIterator<Windowed<K>, V> {
+class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, 
byte[]> {
 
     private final KeyValueIterator<Bytes, byte[]> bytesIterator;
-    private final StateSerdes<K, V> serdes;
 
-    WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> 
bytesIterator,
-                                final StateSerdes<K, V> serdes) {
+    WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> 
bytesIterator) {
         this.bytesIterator = bytesIterator;
-        this.serdes = serdes;
     }
 
     @Override
@@ -39,9 +35,8 @@ class WrappedSessionStoreIterator<K, V> implements 
KeyValueIterator<Windowed<K>,
     }
 
     @Override
-    public Windowed<K> peekNextKey() {
-        final Bytes bytes = bytesIterator.peekNextKey();
-        return SessionKeySchema.from(bytes.get(), serdes.keyDeserializer(), 
serdes.topic());
+    public Windowed<Bytes> peekNextKey() {
+        return SessionKeySchema.from(bytesIterator.peekNextKey());
     }
 
     @Override
@@ -50,9 +45,9 @@ class WrappedSessionStoreIterator<K, V> implements 
KeyValueIterator<Windowed<K>,
     }
 
     @Override
-    public KeyValue<Windowed<K>, V> next() {
+    public KeyValue<Windowed<Bytes>, byte[]> next() {
         final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        return KeyValue.pair(SessionKeySchema.from(next.key.get(), 
serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
+        return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index a6f0a71..1cfdcd7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -71,7 +71,7 @@ public class CachingSessionStoreTest {
     public void setUp() {
         final SessionKeySchema schema = new SessionKeySchema();
         final RocksDBSegmentedBytesStore underlying = new 
RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, 
schema);
-        final RocksDBSessionStore<Bytes, byte[]> sessionStore = new 
RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
+        final RocksDBSessionStore sessionStore = new 
RocksDBSessionStore(underlying);
         cachingStore = new CachingSessionStore<>(sessionStore, 
Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, 
cache);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index cb3fcd4..2bb758e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -83,10 +83,8 @@ public class CachingWindowStoreTest {
     public void setUp() {
         keySchema = new WindowKeySchema();
         underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 
0, SEGMENT_INTERVAL, keySchema);
-        final RocksDBWindowStore<Bytes, byte[]> windowStore = new 
RocksDBWindowStore<>(
+        final RocksDBWindowStore windowStore = new RocksDBWindowStore(
             underlying,
-            Serdes.Bytes(),
-            Serdes.ByteArray(),
             false,
             WINDOW_SIZE);
         cacheListener = new 
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 3653e7e..0786c37 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -36,6 +37,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static java.time.Duration.ofMillis;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.valuesToList;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -51,19 +53,12 @@ public class RocksDBSessionStoreTest {
 
     @Before
     public void before() {
-        final SessionKeySchema schema = new SessionKeySchema();
-
-        final RocksDBSegmentedBytesStore bytesStore = new 
RocksDBSegmentedBytesStore(
-            "session-store",
-            "metrics-scope",
-            10_000L,
-            60_000L,
-            schema);
-
-        sessionStore = new RocksDBSessionStore<>(
-            bytesStore,
+        sessionStore = Stores.sessionStoreBuilder(
+            Stores.persistentSessionStore(
+                "session-store",
+                ofMillis(10_000L)),
             Serdes.String(),
-            Serdes.Long());
+            Serdes.Long()).build();
 
         context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
@@ -74,6 +69,7 @@ public class RocksDBSessionStoreTest {
                 new LogContext("testCache "),
                 0,
                 new MockStreamsMetrics(new Metrics())));
+
         sessionStore.init(context, sessionStore);
     }
 
@@ -188,17 +184,12 @@ public class RocksDBSessionStoreTest {
 
     @Test
     public void shouldFetchExactKeys() {
-        final RocksDBSegmentedBytesStore bytesStore = new 
RocksDBSegmentedBytesStore(
-            "session-store",
-            "metrics-scope",
-            0x7a00000000000000L,
-            0x7a00000000000000L,
-            new SessionKeySchema());
-
-        sessionStore = new RocksDBSessionStore<>(
-            bytesStore,
+        sessionStore = Stores.sessionStoreBuilder(
+            Stores.persistentSessionStore(
+                "session-store",
+                ofMillis(0x7a00000000000000L)),
             Serdes.String(),
-            Serdes.Long());
+            Serdes.Long()).build();
 
         sessionStore.init(context, sessionStore);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 32fa0f7..42b1b8c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -1369,24 +1369,6 @@ public class RocksDBWindowStoreTest {
     }
 
     @Test
-    public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
-        windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(
-                windowName,
-                "metrics-scope",
-                retentionPeriod,
-                segmentInterval,
-                new WindowKeySchema()),
-            Serdes.Integer(),
-            new SerdeThatDoesntHandleNull(),
-            false,
-            windowSize);
-        windowStore.init(context, windowStore);
-
-        assertNull(windowStore.fetch(1, 0));
-    }
-
-    @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
         final WindowStore<Bytes, String> windowStore = 
Stores.windowStoreBuilder(
             Stores.persistentWindowStore(windowName, ofMillis(60_000L), 
ofMillis(60_000L), true),

Reply via email to