KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

This is a follow up of https://github.com/apache/kafka/pull/2166 - refactoring 
the store hierarchies as requested

Author: Damian Guy <damian....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>

Closes #2360 from dguy/state-store-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73b7ae00
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73b7ae00
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73b7ae00

Branch: refs/heads/trunk
Commit: 73b7ae0019d387407375f3865e263225c986a6ce
Parents: 825f225
Author: Damian Guy <damian....@gmail.com>
Authored: Tue Jan 17 14:13:46 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Jan 17 14:13:46 2017 -0800

----------------------------------------------------------------------
 .../kstream/internals/SessionKeySerde.java      |  17 ++
 .../streams/state/WindowStoreIterator.java      |   3 +-
 .../AbstractMergedSortedCacheStoreIterator.java | 166 +++++++++++++++
 .../state/internals/CachingKeyValueStore.java   |  10 +-
 .../state/internals/CachingSessionStore.java    |  63 ++----
 .../state/internals/CachingWindowStore.java     |  64 +++---
 .../ChangeLoggingKeyValueBytesStore.java        |  93 +++++++++
 .../internals/ChangeLoggingKeyValueStore.java   | 127 ++++++++++++
 .../ChangeLoggingSegmentedBytesStore.java       |  28 +--
 .../internals/CompositeReadOnlyWindowStore.java |   5 +
 .../DelegatingPeekingKeyValueIterator.java      |  10 +-
 .../MergedSortedCacheKeyValueStoreIterator.java | 130 ++----------
 .../MergedSortedCacheSessionStoreIterator.java  |  71 +++++++
 .../MergedSortedCacheWindowStoreIterator.java   |  58 ++++++
 .../MergedSortedCachedWindowStoreIterator.java  | 107 ----------
 .../state/internals/MeteredKeyValueStore.java   |  27 +--
 .../internals/MeteredSegmentedBytesStore.java   |  27 +--
 .../state/internals/MeteredWindowStore.java     | 180 ----------------
 .../internals/RocksDBKeyValueStoreSupplier.java |  54 +++--
 .../internals/RocksDBSessionStoreSupplier.java  |  54 +++--
 .../streams/state/internals/RocksDBStore.java   |  28 +--
 .../state/internals/RocksDBWindowStore.java     |  25 ++-
 .../internals/RocksDBWindowStoreSupplier.java   |  37 ++--
 .../internals/SerializedKeyValueIterator.java   |  70 +++++++
 .../state/internals/WindowStoreUtils.java       |   3 +
 .../state/internals/WrappedStateStore.java      |  90 ++++++++
 .../internals/KGroupedStreamImplTest.java       |  42 +++-
 .../internals/CachingSessionStoreTest.java      |   3 +-
 .../state/internals/CachingWindowStoreTest.java |   5 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java    | 165 +++++++++++++++
 .../ChangeLoggingKeyValueStoreTest.java         | 207 +++++++++++++++++++
 .../DelegatingPeekingKeyValueIteratorTest.java  |  12 +-
 ...rgedSortedCacheSessionStoreIteratorTest.java | 113 ++++++++++
 ...ergedSortedCacheWindowStoreIteratorTest.java |  35 +++-
 .../internals/ReadOnlyWindowStoreStub.java      |   5 +
 .../RocksDBKeyValueStoreSupplierTest.java       | 155 ++++++++++++++
 .../RocksDBSessionStoreSupplierTest.java        | 169 +++++++++++++++
 .../RocksDBWindowStoreSupplierTest.java         | 168 +++++++++++++++
 .../state/internals/RocksDBWindowStoreTest.java |   2 -
 .../SerializedKeyValueIteratorTest.java         |  95 +++++++++
 40 files changed, 2081 insertions(+), 642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 48213d6..d9a3528 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.nio.ByteBuffer;
@@ -146,4 +147,20 @@ public class SessionKeySerde<K> implements 
Serde<Windowed<K>> {
         buf.putLong(sessionKey.window().start());
         return new Bytes(buf.array());
     }
+
+    public static Bytes bytesToBinary(final Windowed<Bytes> sessionKey) {
+        final byte[] bytes = sessionKey.key().get();
+        ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * 
TIMESTAMP_SIZE);
+        buf.put(bytes);
+        buf.putLong(sessionKey.window().end());
+        buf.putLong(sessionKey.window().start());
+        return new Bytes(buf.array());
+    }
+
+    public static Window extractWindow(final byte [] binaryKey) {
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+        return new TimeWindow(start, end);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index b6e6d0c..958b778 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -22,7 +22,6 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.streams.KeyValue;
 
 import java.io.Closeable;
-import java.util.Iterator;
 
 /**
  * Iterator interface of {@link KeyValue} with key typed {@link Long} used for 
{@link WindowStore#fetch(Object, long, long)}.
@@ -32,7 +31,7 @@ import java.util.Iterator;
  *
  * @param <E> Type of values
  */
-public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, 
Closeable {
+public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, 
Closeable {
 
     @Override
     void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
new file mode 100644
index 0000000..009dad0
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.NoSuchElementException;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param <K>
+ * @param <V>
+ */
+abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements 
KeyValueIterator<K, V> {
+    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
+    private final KeyValueIterator<KS, byte[]> storeIterator;
+    protected final StateSerdes<K, V> serdes;
+
+    AbstractMergedSortedCacheStoreIterator(final 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                                           final KeyValueIterator<KS, byte[]> 
storeIterator,
+                                           final StateSerdes<K, V> serdes) {
+        this.cacheIterator = cacheIterator;
+        this.storeIterator = storeIterator;
+        this.serdes = serdes;
+    }
+
+    abstract int compare(final Bytes cacheKey, final KS storeKey);
+
+    abstract K deserializeStoreKey(final KS key);
+
+    abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, byte[]> 
pair);
+
+    abstract K deserializeCacheKey(final Bytes cacheKey);
+
+    private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> 
nextFromCache) {
+        return nextFromCache.value.value == null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        // skip over items deleted from cache, and corresponding store items 
if they have the same key
+        while (cacheIterator.hasNext() && 
isDeletedCacheEntry(cacheIterator.peekNext())) {
+            if (storeIterator.hasNext()) {
+                final KS nextStoreKey = storeIterator.peekNextKey();
+                // advance the store iterator if the key is the same as the 
deleted cache key
+                if (compare(cacheIterator.peekNextKey(), nextStoreKey) == 0) {
+                    storeIterator.next();
+                }
+            }
+            cacheIterator.next();
+        }
+
+        return cacheIterator.hasNext() || storeIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        final Bytes nextCacheKey = cacheIterator.hasNext() ? 
cacheIterator.peekNextKey() : null;
+        final KS nextStoreKey = storeIterator.hasNext() ? 
storeIterator.peekNextKey() : null;
+
+        if (nextCacheKey == null) {
+            return nextStoreValue(nextStoreKey);
+        }
+
+        if (nextStoreKey == null) {
+            return nextCacheValue(nextCacheKey);
+        }
+
+        final int comparison = compare(nextCacheKey, nextStoreKey);
+        if (comparison > 0) {
+            return nextStoreValue(nextStoreKey);
+        } else if (comparison < 0) {
+            return nextCacheValue(nextCacheKey);
+        } else {
+            // skip the same keyed element
+            storeIterator.next();
+            return nextCacheValue(nextCacheKey);
+        }
+    }
+
+    private KeyValue<K, V> nextStoreValue(KS nextStoreKey) {
+        final KeyValue<KS, byte[]> next = storeIterator.next();
+
+        if (!next.key.equals(nextStoreKey)) {
+            throw new IllegalStateException("Next record key is not the peeked 
key value; this should not happen");
+        }
+
+        return deserializeStorePair(next);
+    }
+
+    private KeyValue<K, V> nextCacheValue(Bytes nextCacheKey) {
+        final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
+
+        if (!next.key.equals(nextCacheKey)) {
+            throw new IllegalStateException("Next record key is not the peeked 
key value; this should not happen");
+        }
+
+        return KeyValue.pair(deserializeCacheKey(next.key), 
serdes.valueFrom(next.value.value));
+    }
+
+    @Override
+    public K peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        final Bytes nextCacheKey = cacheIterator.hasNext() ? 
cacheIterator.peekNextKey() : null;
+        final KS nextStoreKey = storeIterator.hasNext() ? 
storeIterator.peekNextKey() : null;
+
+        if (nextCacheKey == null) {
+            return deserializeStoreKey(nextStoreKey);
+        }
+
+        if (nextStoreKey == null) {
+            return serdes.keyFrom(nextCacheKey.get());
+        }
+
+        final int comparison = compare(nextCacheKey, nextStoreKey);
+        if (comparison > 0) {
+            return deserializeStoreKey(nextStoreKey);
+        } else if (comparison < 0) {
+            return deserializeCacheKey(nextCacheKey);
+        } else {
+            // skip the same keyed element
+            storeIterator.next();
+            return deserializeCacheKey(nextCacheKey);
+        }
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove() is not supported");
+    }
+
+    @Override
+    public void close() {
+        cacheIterator.close();
+        storeIterator.close();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index fdb03fd..9a0a976 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
-class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, 
CachedStateStore<K, V> {
+class CachingKeyValueStore<K, V> implements WrappedStateStore, 
KeyValueStore<K, V>, CachedStateStore<K, V> {
 
     private final KeyValueStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
@@ -234,4 +234,12 @@ class CachingKeyValueStore<K, V> implements 
KeyValueStore<K, V>, CachedStateStor
     KeyValueStore<Bytes, byte[]> underlying() {
         return underlying;
     }
+
+    @Override
+    public StateStore inner() {
+        if (underlying instanceof WrappedStateStore) {
+            return ((WrappedStateStore) underlying).inner();
+        }
+        return underlying;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 17c4ee0..fec6609 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -35,21 +34,22 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 
-class CachingSessionStore<K, AGG>  implements SessionStore<K, AGG>, 
CachedStateStore<Windowed<K>, AGG> {
+class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractWrappedStateStore implements SessionStore<K, AGG>, 
CachedStateStore<Windowed<K>, AGG> {
 
-    private final SegmentedBytesStore bytesStore;
+    private final SessionStore<Bytes, byte[]> bytesStore;
     private final SessionKeySchema keySchema;
     private Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
     private InternalProcessorContext context;
     private String name;
-    private StateSerdes<Windowed<K>, AGG> serdes;
+    private StateSerdes<K, AGG> serdes;
     private ThreadCache cache;
     private CacheFlushListener<Windowed<K>, AGG> flushListener;
 
-    CachingSessionStore(final SegmentedBytesStore bytesStore,
+    CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                         final Serde<K> keySerde,
                         final Serde<AGG> aggSerde) {
+        super(bytesStore);
         this.bytesStore = bytesStore;
         this.keySerde = keySerde;
         this.aggSerde = aggSerde;
@@ -65,12 +65,12 @@ class CachingSessionStore<K, AGG>  implements 
SessionStore<K, AGG>, CachedStateS
                                                                                
   keySchema.lowerRange(binarySessionId,
                                                                                
                        earliestSessionEndTime).get(),
                                                                                
   keySchema.upperRange(binarySessionId, latestSessionStartTime).get());
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
bytesStore.fetch(binarySessionId, earliestSessionEndTime, 
latestSessionStartTime);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
bytesStore.findSessions(binarySessionId, earliestSessionEndTime, 
latestSessionStartTime);
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(binarySessionId,
-                                                                               
     earliestSessionEndTime,
-                                                                               
     latestSessionStartTime);
+                                                                             
earliestSessionEndTime,
+                                                                             
latestSessionStartTime);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(cacheIterator, 
hasNextCondition);
-        return new 
MergedSortedCacheKeyValueStoreIterator<>(filteredCacheIterator, storeIterator, 
serdes);
+        return new 
MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, 
serdes);
     }
 
 
@@ -92,11 +92,6 @@ class CachingSessionStore<K, AGG>  implements 
SessionStore<K, AGG>, CachedStateS
         return findSessions(key, 0, Long.MAX_VALUE);
     }
 
-
-    public String name() {
-        return bytesStore.name();
-    }
-
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
@@ -107,14 +102,9 @@ class CachingSessionStore<K, AGG>  implements 
SessionStore<K, AGG>, CachedStateS
     private void initInternal(final InternalProcessorContext context) {
         this.context = context;
 
-        if (keySerde == null) {
-            keySerde = (Serde<K>) context.keySerde();
-        }
-
-
-        this.serdes = (StateSerdes<Windowed<K>, AGG>) new 
StateSerdes<>(bytesStore.name(),
-                                                                              
new SessionKeySerde<>(keySerde),
-                                                                              
aggSerde == null ? context.valueSerde() : aggSerde);
+        this.serdes = new StateSerdes<>(bytesStore.name(),
+                                        keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
+                                        aggSerde == null ? (Serde<AGG>) 
context.valueSerde() : aggSerde);
 
 
         this.name = context.taskId() + "-" + bytesStore.name();
@@ -135,27 +125,27 @@ class CachingSessionStore<K, AGG>  implements 
SessionStore<K, AGG>, CachedStateS
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
+            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), 
keySerde.deserializer());
             if (flushListener != null) {
-                final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), 
keySerde.deserializer());
                 final AGG newValue = serdes.valueFrom(entry.newValue());
                 final AGG oldValue = fetchPrevious(binaryKey);
                 if (!(newValue == null && oldValue == null)) {
                     flushListener.apply(key, newValue == null ? null : 
newValue, oldValue);
                 }
-
             }
-            bytesStore.put(binaryKey, entry.newValue());
+            bytesStore.put(new 
Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), 
entry.newValue());
         } finally {
             context.setRecordContext(current);
         }
     }
 
     private AGG fetchPrevious(final Bytes key) {
-        final byte[] bytes = bytesStore.get(key);
-        if (bytes == null) {
-            return null;
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
bytesStore.fetch(key)) {
+            if (!iterator.hasNext()) {
+                return null;
+            }
+            return serdes.valueFrom(iterator.next().value);
         }
-        return serdes.valueFrom(bytes);
     }
 
 
@@ -170,25 +160,10 @@ class CachingSessionStore<K, AGG>  implements 
SessionStore<K, AGG>, CachedStateS
         cache.close(name);
     }
 
-    public boolean persistent() {
-        return bytesStore.persistent();
-    }
-
-    public boolean isOpen() {
-        return bytesStore.isOpen();
-    }
-
     public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> 
flushListener) {
         this.flushListener = flushListener;
     }
 
-    private void validateStoreOpen() {
-        if (!isOpen()) {
-            throw new InvalidStateStoreException("Store " + this.name + " is 
currently closed");
-        }
-    }
-
-
     private static class FilteredCacheIterator implements 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
         private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
         private final HasNextCondition hasNextCondition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index bd252f1..d471761 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -26,16 +26,15 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.RecordContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.List;
 
-class CachingWindowStore<K, V> implements WindowStore<K, V>, 
CachedStateStore<Windowed<K>, V> {
+class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractWrappedStateStore implements WindowStore<K, V>, 
CachedStateStore<Windowed<K>, V> {
 
-    private final SegmentedBytesStore underlying;
+    private final WindowStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private CacheFlushListener<Windowed<K>, V> flushListener;
@@ -45,21 +44,17 @@ class CachingWindowStore<K, V> implements WindowStore<K, 
V>, CachedStateStore<Wi
     private InternalProcessorContext context;
     private StateSerdes<K, V> serdes;
 
-    CachingWindowStore(final SegmentedBytesStore underlying,
+    CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde,
                        final long windowSize) {
+        super(underlying);
         this.underlying = underlying;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.windowSize = windowSize;
     }
 
-    @Override
-    public String name() {
-        return underlying.name();
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
@@ -80,13 +75,14 @@ class CachingWindowStore<K, V> implements WindowStore<K, 
V>, CachedStateStore<Wi
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
                 for (ThreadCache.DirtyEntry entry : entries) {
-                    final byte[] binaryKey = entry.key().get();
-                    final Bytes key = 
WindowStoreUtils.bytesKeyFromBinaryKey(binaryKey);
-                    final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(binaryKey);
-                    final Windowed<K> windowedKey = new 
Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes),
+                    final byte[] binaryWindowKey = entry.key().get();
+                    final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
+
+                    final Windowed<K> windowedKey = new 
Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes),
                                                                    new 
TimeWindow(timestamp, timestamp + windowSize));
-                    maybeForward(entry, Bytes.wrap(binaryKey), windowedKey, 
(InternalProcessorContext) context);
-                    
underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, 0, 
WindowStoreUtils.INNER_SERDES)), entry.newValue());
+                    final Bytes key = 
WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
+                    maybeForward(entry, key, windowedKey, 
(InternalProcessorContext) context);
+                    underlying.put(key, entry.newValue(), timestamp);
                 }
             }
         });
@@ -102,7 +98,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, 
CachedStateStore<Wi
             context.setRecordContext(entry.recordContext());
             try {
                 flushListener.apply(windowedKey,
-                                    serdes.valueFrom(entry.newValue()), 
fetchPrevious(key));
+                                    serdes.valueFrom(entry.newValue()), 
fetchPrevious(key, windowedKey.window().start()));
             } finally {
                 context.setRecordContext(current);
             }
@@ -128,16 +124,6 @@ class CachingWindowStore<K, V> implements WindowStore<K, 
V>, CachedStateStore<Wi
     }
 
     @Override
-    public boolean persistent() {
-        return underlying.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return underlying.isOpen();
-    }
-
-    @Override
     public synchronized void put(final K key, final V value) {
         put(key, value, context.timestamp());
     }
@@ -158,23 +144,21 @@ class CachingWindowStore<K, V> implements WindowStore<K, 
V>, CachedStateStore<Wi
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, 
serdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
 
-        final KeyValueIterator<Bytes, byte[]> underlyingIterator = 
underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        final WindowStoreIterator<byte[]> underlyingIterator = 
underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, binaryFrom, binaryTo);
-        return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new 
DelegatingPeekingKeyValueIterator<>(name, underlyingIterator), serdes);
+        return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
+                                                          underlyingIterator,
+                                                          new 
StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
     }
 
 
-    private V fetchPrevious(final Bytes key) {
-        final byte[] result = underlying.get(key);
-        if (result == null) {
-            return null;
-        }
-        return serdes.valueFrom(result);
-    }
-
-    private void validateStoreOpen() {
-        if (!isOpen()) {
-            throw new InvalidStateStoreException("Store " + this.name + " is 
currently closed");
+    private V fetchPrevious(final Bytes key, final long timestamp) {
+        try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, 
timestamp, timestamp)) {
+            if (!iter.hasNext()) {
+                return null;
+            } else {
+                return serdes.valueFrom(iter.next().value);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
new file mode 100644
index 0000000..e31d04b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<Bytes, 
byte[]> {
+    private final KeyValueStore<Bytes, byte[]> inner;
+    private StoreChangeLogger<Bytes, byte[]> changeLogger;
+
+    public ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> 
inner) {
+        super(inner);
+        this.inner = inner;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        inner.init(context, root);
+        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, 
WindowStoreUtils.INNER_SERDES);
+    }
+
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        inner.put(key, value);
+        changeLogger.logChange(key, value);
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        final byte[] previous = get(key);
+        if (previous == null) {
+            put(key, value);
+        }
+        return previous;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        inner.putAll(entries);
+        for (KeyValue<Bytes, byte[]> entry : entries) {
+            changeLogger.logChange(entry.key, entry.value);
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] oldValue = inner.get(key);
+        put(key, null);
+        return oldValue;
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return inner.get(key);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes 
to) {
+        return inner.range(from, to);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return inner.all();
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
new file mode 100644
index 0000000..11cf802
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class ChangeLoggingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> {
+    private final ChangeLoggingKeyValueBytesStore innerBytes;
+    private final Serde keySerde;
+    private final Serde valueSerde;
+    private StateSerdes<K, V> serdes;
+
+    ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore,
+                               final Serde keySerde,
+                               final Serde valueSerde) {
+        this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, 
valueSerde);
+    }
+
+    private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore 
bytesStore,
+                                       final Serde keySerde,
+                                       final Serde valueSerde) {
+        super(bytesStore);
+        this.innerBytes = bytesStore;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        innerBytes.init(context, root);
+
+        this.serdes = new StateSerdes<>(innerBytes.name(),
+                                        keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
+                                        valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
+        final byte[] bytesValue = serdes.rawValue(value);
+        innerBytes.put(bytesKey, bytesValue);
+    }
+
+    @Override
+    public V putIfAbsent(final K key, final V value) {
+        final V v = get(key);
+        if (v == null) {
+            put(key, value);
+        }
+        return v;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
+        for (final KeyValue<K, V> entry : entries) {
+            keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), 
serdes.rawValue(entry.value)));
+        }
+        innerBytes.putAll(keyValues);
+    }
+
+    @Override
+    public V delete(final K key) {
+        final byte[] oldValue = 
innerBytes.delete(Bytes.wrap(serdes.rawKey(key)));
+        if (oldValue == null) {
+            return null;
+        }
+        return serdes.valueFrom(oldValue);
+    }
+
+    @Override
+    public V get(final K key) {
+        final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key)));
+        if (rawValue == null) {
+            return null;
+        }
+        return serdes.valueFrom(rawValue);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        return new 
SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
+                                                                 
Bytes.wrap(serdes.rawKey(to))),
+                                                serdes);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return innerBytes.approximateNumEntries();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 14b8f17..21c2866 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -25,13 +25,14 @@ import org.apache.kafka.streams.state.KeyValueIterator;
  * Simple wrapper around a {@link SegmentedBytesStore} to support writing
  * updates to a changelog
  */
-class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore {
+class ChangeLoggingSegmentedBytesStore extends 
WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore {
 
     private final SegmentedBytesStore bytesStore;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
 
     ChangeLoggingSegmentedBytesStore(final SegmentedBytesStore bytesStore) {
+        super(bytesStore);
         this.bytesStore = bytesStore;
     }
 
@@ -60,10 +61,6 @@ class ChangeLoggingSegmentedBytesStore implements 
SegmentedBytesStore {
         return bytesStore.get(key);
     }
 
-    @Override
-    public String name() {
-        return bytesStore.name();
-    }
 
     @Override
     @SuppressWarnings("unchecked")
@@ -71,25 +68,4 @@ class ChangeLoggingSegmentedBytesStore implements 
SegmentedBytesStore {
         bytesStore.init(context, root);
         changeLogger = new StoreChangeLogger<>(name(), context, 
WindowStoreUtils.INNER_SERDES);
     }
-
-    @Override
-    public void flush() {
-        bytesStore.flush();
-    }
-
-    @Override
-    public void close() {
-        bytesStore.close();
-    }
-
-    @Override
-    public boolean persistent() {
-        return bytesStore.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return bytesStore.isOpen();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index b33c0f0..e0f1ec8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -62,6 +62,11 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
             }
 
             @Override
+            public Long peekNextKey() {
+                throw new NoSuchElementException();
+            }
+
+            @Override
             public boolean hasNext() {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index eb57ace..f3101b1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.NoSuchElementException;
 
-public class DelegatingPeekingKeyValueIterator<K, V> implements 
KeyValueIterator<K, V> {
+public class DelegatingPeekingKeyValueIterator<K, V> implements 
KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
     private final String storeName;
     private final KeyValueIterator<K, V> underlying;
     private KeyValue<K, V> next;
@@ -78,4 +78,12 @@ public class DelegatingPeekingKeyValueIterator<K, V> 
implements KeyValueIterator
     public void remove() {
         throw new UnsupportedOperationException("remove not supported");
     }
+
+    @Override
+    public KeyValue<K, V> peekNext() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return next;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index c9a6866..b860e16 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -21,141 +21,37 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.Comparator;
-import java.util.NoSuchElementException;
-
 /**
  * Merges two iterators. Assumes each of them is sorted by key
  *
  * @param <K>
  * @param <V>
  */
-class MergedSortedCacheKeyValueStoreIterator<K, V> implements 
KeyValueIterator<K, V> {
-    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
-    private final KeyValueIterator<Bytes, byte[]> storeIterator;
-    private final StateSerdes<K, V> serdes;
-    private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
-    public MergedSortedCacheKeyValueStoreIterator(final 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
-                                                  final 
KeyValueIterator<Bytes, byte[]> storeIterator,
-                                                  final StateSerdes<K, V> 
serdes) {
-        this.cacheIterator = cacheIterator;
-        this.storeIterator = storeIterator;
-        this.serdes = serdes;
-    }
-
-    @Override
-    public boolean hasNext() {
-        while (cacheIterator.hasNext() && 
isDeletedCacheEntry(cacheIterator.peekNext())) {
-            if (storeIterator.hasNext()) {
-                final Bytes storeKey = storeIterator.peekNextKey();
-                // advance the store iterator if the key is the same as the 
deleted cache key
-                if (storeKey.equals(cacheIterator.peekNextKey())) {
-                    storeIterator.next();
-                }
-            }
-            // skip over items deleted from cache
-            cacheIterator.next();
-        }
-        return cacheIterator.hasNext() || storeIterator.hasNext();
-    }
+class MergedSortedCacheKeyValueStoreIterator<K, V> extends 
AbstractMergedSortedCacheStoreIterator<K, Bytes, V> {
 
-
-    private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> 
nextFromCache) {
-        return  nextFromCache.value.value == null;
+    MergedSortedCacheKeyValueStoreIterator(final 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                                           final KeyValueIterator<Bytes, 
byte[]> storeIterator,
+                                           final StateSerdes<K, V> serdes) {
+        super(cacheIterator, storeIterator, serdes);
     }
 
-
     @Override
-    public KeyValue<K, V> next() {
-
-        return internalNext(new NextValueFunction<KeyValue<K, V>>() {
-            @Override
-            public KeyValue<K, V> apply(final byte[] cacheKey, final byte[] 
storeKey) {
-                if (cacheKey == null) {
-                    return nextStoreValue();
-                }
-
-                if (storeKey == null) {
-                    return nextCacheValue();
-                }
-
-                final int comparison = comparator.compare(cacheKey, storeKey);
-                if (comparison > 0) {
-                    return nextStoreValue();
-                } else if (comparison < 0) {
-                    return nextCacheValue();
-                } else {
-                    storeIterator.next();
-                    return nextCacheValue();
-                }
-            }
-        });
+    public KeyValue<K, V> deserializeStorePair(KeyValue<Bytes, byte[]> pair) {
+        return KeyValue.pair(serdes.keyFrom(pair.key.get()), 
serdes.valueFrom(pair.value));
     }
 
     @Override
-    public K peekNextKey() {
-        return internalNext(new NextValueFunction<K>() {
-            @Override
-            public K apply(final byte[] cacheKey, final byte[] storeKey) {
-                if (cacheKey == null) {
-                    return serdes.keyFrom(storeKey);
-                }
-
-                if (storeKey == null) {
-                    return serdes.keyFrom(cacheKey);
-                }
-
-                final int comparison = comparator.compare(cacheKey, storeKey);
-                if (comparison > 0) {
-                    return serdes.keyFrom(storeKey);
-                } else {
-                    return serdes.keyFrom(cacheKey);
-                }
-            }
-        });
-    }
-
-    interface NextValueFunction<T> {
-        T apply(final byte[] cacheKey, final byte [] storeKey);
-    }
-
-    private <T> T internalNext(final NextValueFunction<T> nextValueFunction) {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-
-        byte[] nextCacheKey = null;
-        if (cacheIterator.hasNext()) {
-            nextCacheKey = cacheIterator.peekNextKey().get();
-        }
-
-        byte[] nextStoreKey = null;
-        if (storeIterator.hasNext()) {
-            nextStoreKey = storeIterator.peekNextKey().get();
-        }
-
-        return nextValueFunction.apply(nextCacheKey, nextStoreKey);
-    }
-
-    private KeyValue<K, V> nextCacheValue() {
-        final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
-        return KeyValue.pair(serdes.keyFrom(next.key.get()), 
serdes.valueFrom(next.value.value));
-    }
-
-    private KeyValue<K, V> nextStoreValue() {
-        final KeyValue<Bytes, byte[]> next = storeIterator.next();
-        return KeyValue.pair(serdes.keyFrom(next.key.get()), 
serdes.valueFrom(next.value));
+    K deserializeCacheKey(final Bytes cacheKey) {
+        return serdes.keyFrom(cacheKey.get());
     }
 
     @Override
-    public void remove() {
-        throw new UnsupportedOperationException("remove not supported");
+    public K deserializeStoreKey(Bytes key) {
+        return serdes.keyFrom(key.get());
     }
 
     @Override
-    public void close() {
-        cacheIterator.close();
-        storeIterator.close();
+    public int compare(Bytes cacheKey, Bytes storeKey) {
+        return cacheKey.compareTo(storeKey);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
new file mode 100644
index 0000000..db64621
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param <K>
+ * @param <AGG>
+ */
+class MergedSortedCacheSessionStoreIterator<K, AGG> extends 
AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG> {
+    private final StateSerdes<K, AGG> rawSerdes;
+
+
+    MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, 
LRUCacheEntry> cacheIterator,
+                                          final 
KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
+                                          final StateSerdes<K, AGG> serdes) {
+        super(cacheIterator, storeIterator, new 
StateSerdes<>(serdes.stateName(),
+                                                              new 
SessionKeySerde<>(serdes.keySerde()),
+                                                              
serdes.valueSerde()));
+
+        rawSerdes = serdes;
+    }
+
+    @Override
+    public KeyValue<Windowed<K>, AGG> 
deserializeStorePair(KeyValue<Windowed<Bytes>, byte[]> pair) {
+        final K key = rawSerdes.keyFrom(pair.key.key().get());
+        return KeyValue.pair(new Windowed<>(key, pair.key.window()), 
serdes.valueFrom(pair.value));
+    }
+
+    @Override
+    Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
+        return SessionKeySerde.from(cacheKey.get(), 
rawSerdes.keyDeserializer());
+    }
+
+    @Override
+    public Windowed<K> deserializeStoreKey(Windowed<Bytes> key) {
+        final K originalKey = rawSerdes.keyFrom(key.key().get());
+        return new Windowed<K>(originalKey, key.window());
+    }
+
+    @Override
+    public int compare(Bytes cacheKey, Windowed<Bytes> storeKey) {
+        Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey);
+        return cacheKey.compareTo(storeKeyBytes);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
new file mode 100644
index 0000000..a9d0973
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param <V>
+ */
+class MergedSortedCacheWindowStoreIterator<V> extends 
AbstractMergedSortedCacheStoreIterator<Long, Long, V> implements 
WindowStoreIterator<V> {
+
+    MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, 
LRUCacheEntry> cacheIterator,
+                                         final KeyValueIterator<Long, byte[]> 
storeIterator,
+                                         final StateSerdes<Long, V> serdes) {
+        super(cacheIterator, storeIterator, serdes);
+    }
+
+    @Override
+    public KeyValue<Long, V> deserializeStorePair(final KeyValue<Long, byte[]> 
pair) {
+        return KeyValue.pair(pair.key, serdes.valueFrom(pair.value));
+    }
+
+    @Override
+    Long deserializeCacheKey(final Bytes cacheKey) {
+        return WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+    }
+
+    @Override
+    public Long deserializeStoreKey(final Long key) {
+        return key;
+    }
+
+    @Override
+    public int compare(final Bytes cacheKey, final Long storeKey) {
+        final Long cacheTimestamp = 
WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+        return cacheTimestamp.compareTo(storeKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
deleted file mode 100644
index e210e73..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.util.NoSuchElementException;
-
-/**
- * Merges two iterators. Assumes each of them is sorted by key
- *
- * @param <K>
- * @param <V>
- */
-class MergedSortedCachedWindowStoreIterator<K, V> implements 
WindowStoreIterator<V> {
-    private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
-    private final KeyValueIterator<Bytes, byte[]> storeIterator;
-    private final StateSerdes<K, V> serdes;
-
-    public MergedSortedCachedWindowStoreIterator(final 
ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
-                                                 final KeyValueIterator<Bytes, 
byte[]> storeIterator,
-                                                 final StateSerdes<K, V> 
serdes) {
-        this.cacheIterator = cacheIterator;
-        this.storeIterator = storeIterator;
-        this.serdes = serdes;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return cacheIterator.hasNext() || storeIterator.hasNext();
-    }
-
-
-    @Override
-    public KeyValue<Long, V> next() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-
-        Long nextCacheTimestamp = null;
-        if (cacheIterator.hasNext()) {
-            nextCacheTimestamp = 
WindowStoreUtils.timestampFromBinaryKey(cacheIterator.peekNextKey().get());
-        }
-
-        Long nextStoreTimestamp = null;
-        if (storeIterator.hasNext()) {
-            nextStoreTimestamp = 
WindowStoreUtils.timestampFromBinaryKey(storeIterator.peekNextKey().get());
-        }
-
-        if (nextCacheTimestamp == null) {
-            return nextStoreValue(nextStoreTimestamp);
-        }
-
-        if (nextStoreTimestamp == null) {
-            return nextCacheValue(nextCacheTimestamp);
-        }
-
-        final int comparison = 
nextCacheTimestamp.compareTo(nextStoreTimestamp);
-        if (comparison > 0) {
-            return nextStoreValue(nextStoreTimestamp);
-        } else if (comparison < 0) {
-            return nextCacheValue(nextCacheTimestamp);
-        } else {
-            storeIterator.next();
-            return nextCacheValue(nextCacheTimestamp);
-        }
-    }
-
-    private KeyValue<Long, V> nextCacheValue(final Long timestamp) {
-        final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
-        return KeyValue.pair(timestamp, serdes.valueFrom(next.value.value));
-    }
-
-    private KeyValue<Long, V> nextStoreValue(final Long timestamp) {
-        final KeyValue<Bytes, byte[]> next = storeIterator.next();
-        return KeyValue.pair(timestamp, serdes.valueFrom(next.value));
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException("remove not supported");
-    }
-
-    @Override
-    public void close() {
-        cacheIterator.close();
-        storeIterator.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 926e5d4..7dc2d33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -35,7 +35,7 @@ import java.util.List;
  * @param <K>
  * @param <V>
  */
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
+public class MeteredKeyValueStore<K, V> extends 
WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
     protected final String metricScope;
@@ -102,18 +102,16 @@ public class MeteredKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
     };
 
     // always wrap the store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String 
metricScope, Time time) {
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner,
+                                final String metricScope,
+                                final Time time) {
+        super(inner);
         this.inner = inner;
         this.metricScope = metricScope;
         this.time = time != null ? time : Time.SYSTEM;
     }
 
     @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         this.context = context;
@@ -134,16 +132,6 @@ public class MeteredKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
     }
 
     @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return inner.isOpen();
-    }
-
-    @Override
     public V get(K key) {
         this.key = key;
         metrics.measureLatencyNs(time, getDelegate, this.getTime);
@@ -194,11 +182,6 @@ public class MeteredKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
     }
 
     @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
     public void flush() {
         metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
index e0ed03e..4eb3936 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
-class MeteredSegmentedBytesStore implements SegmentedBytesStore {
+class MeteredSegmentedBytesStore extends 
WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore {
 
     private final SegmentedBytesStore inner;
     private final String metricScope;
@@ -40,18 +40,16 @@ class MeteredSegmentedBytesStore implements 
SegmentedBytesStore {
     private Sensor getTime;
     private Sensor removeTime;
 
-    MeteredSegmentedBytesStore(final SegmentedBytesStore inner, String 
metricScope, Time time) {
+    MeteredSegmentedBytesStore(final SegmentedBytesStore inner,
+                               final String metricScope,
+                               final Time time) {
+        super(inner);
         this.inner = inner;
         this.metricScope = metricScope;
         this.time = time != null ? time : new SystemTime();
     }
 
     @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         this.metrics = context.metrics();
@@ -72,16 +70,6 @@ class MeteredSegmentedBytesStore implements 
SegmentedBytesStore {
     }
 
     @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return inner.isOpen();
-    }
-
-    @Override
     public byte[] get(final Bytes key) {
         final long startNs = time.nanoseconds();
         try {
@@ -117,11 +105,6 @@ class MeteredSegmentedBytesStore implements 
SegmentedBytesStore {
     }
 
     @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
     public void flush() {
         final long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
deleted file mode 100644
index c725c1a..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
-
-    protected final WindowStore<K, V> inner;
-    protected final String metricScope;
-    protected final Time time;
-
-    private Sensor putTime;
-    private Sensor fetchTime;
-    private Sensor flushTime;
-    private Sensor restoreTime;
-    private StreamsMetricsImpl metrics;
-
-    private ProcessorContext context;
-    private StateStore root;
-    private Runnable initDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.init(context, root);
-        }
-    };
-
-    private K key;
-    private V value;
-    private long timestamp;
-    private Runnable putDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.put(key, value);
-        }
-    };
-    private Runnable putTsDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.put(key, value, timestamp);
-        }
-    };
-    private Runnable flushDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.flush();
-        }
-    };
-
-    // always wrap the store with the metered store
-    public MeteredWindowStore(final WindowStore<K, V> inner, String 
metricScope, Time time) {
-        this.inner = inner;
-        this.metricScope = metricScope;
-        this.time = time != null ? time : Time.SYSTEM;
-    }
-
-    @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
-    public void init(ProcessorContext context, StateStore root) {
-        final String name = name();
-        this.context = context;
-        this.root = root;
-        this.metrics = (StreamsMetricsImpl) context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put", 
Sensor.RecordingLevel.DEBUG);
-        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, 
"fetch", Sensor.RecordingLevel.DEBUG);
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, 
"flush", Sensor.RecordingLevel.DEBUG);
-        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, 
"restore", Sensor.RecordingLevel.DEBUG);
-
-        // register and possibly restore the state from the logs
-        metrics.measureLatencyNs(time, initDelegate, this.restoreTime);
-    }
-
-    @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return inner.isOpen();
-    }
-
-    @Override
-    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
-        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, 
timeFrom, timeTo), this.fetchTime);
-    }
-
-    @Override
-    public void put(K key, V value) {
-        this.key = key;
-        this.value = value;
-        metrics.measureLatencyNs(time, putDelegate, this.putTime);
-    }
-
-    @Override
-    public void put(K key, V value, long timestamp) {
-        this.key = key;
-        this.value = value;
-        this.timestamp = timestamp;
-        metrics.measureLatencyNs(time, putTsDelegate, this.putTime);
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
-    public void flush() {
-        metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
-    }
-
-    private class MeteredWindowStoreIterator<E> implements 
WindowStoreIterator<E> {
-
-        private final WindowStoreIterator<E> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor 
sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<Long, E> next() {
-            return iter.next();
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, 
time.nanoseconds());
-            }
-        }
-
-    }
-
-    WindowStore<K, V> inner() {
-        return inner;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 164b352..b72bbed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -35,28 +35,54 @@ import java.util.Map;
 
 public class RocksDBKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, KeyValueStore> {
 
-    private final boolean enableCaching;
+    private static final String METRICS_SCOPE = "rocksdb-state";
+    private final boolean cached;
 
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean 
enableCaching) {
-        this(name, keySerde, valueSerde, null, logged, logConfig, 
enableCaching);
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean 
cached) {
+        this(name, keySerde, valueSerde, null, logged, logConfig, cached);
     }
 
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, 
boolean enableCaching) {
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, 
boolean cached) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
-        this.enableCaching = enableCaching;
+        this.cached = cached;
     }
 
     public KeyValueStore get() {
-        if (!enableCaching) {
-            RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, 
valueSerde);
-            return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "rocksdb-state", time);
+        if (!cached && !logged) {
+            return new MeteredKeyValueStore<>(
+                    new RocksDBStore<>(name, keySerde, valueSerde), 
METRICS_SCOPE, time);
+        }
+
+        // when cached, logged, or both we use a bytes store as the inner most 
store
+        final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
+                                                                     
Serdes.Bytes(),
+                                                                     
Serdes.ByteArray());
+
+        if (cached && logged) {
+            return new CachingKeyValueStore<>(
+                    new MeteredKeyValueStore<>(
+                            new ChangeLoggingKeyValueBytesStore(rocks),
+                            METRICS_SCOPE,
+                            time),
+                    keySerde,
+                    valueSerde);
+        }
+
+        if (cached) {
+            return new CachingKeyValueStore<>(
+                    new MeteredKeyValueStore<>(rocks, METRICS_SCOPE, time),
+                    keySerde,
+                    valueSerde);
+
+        } else {
+            // logged
+            return new MeteredKeyValueStore<>(
+                    new ChangeLoggingKeyValueStore<>(rocks, keySerde, 
valueSerde),
+                    METRICS_SCOPE,
+                    time);
         }
 
-        final RocksDBStore<Bytes, byte[]> store = new RocksDBStore<>(name, 
Serdes.Bytes(), Serdes.ByteArray());
-        return new CachingKeyValueStore<>(new MeteredKeyValueStore<>(logged ? 
store.enableLogging() : store,
-                "rocksdb-state",
-                time),
-                keySerde,
-                valueSerde);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index 7645472..10ebf65 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.SessionStore;
 
@@ -35,13 +37,14 @@ import java.util.Map;
 public class RocksDBSessionStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, SessionStore> implements 
WindowStoreSupplier<SessionStore> {
 
     private static final int NUM_SEGMENTS = 3;
+    public static final String METRIC_SCOPE = "rocksdb-session-store";
     private final long retentionPeriod;
-    private final boolean enableCaching;
+    private final boolean cached;
 
-    public RocksDBSessionStoreSupplier(String name, long retentionPeriod, 
Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> 
logConfig, boolean enableCaching) {
+    public RocksDBSessionStoreSupplier(String name, long retentionPeriod, 
Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> 
logConfig, boolean cached) {
         super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
         this.retentionPeriod = retentionPeriod;
-        this.enableCaching = enableCaching;
+        this.cached = cached;
     }
 
     public String name() {
@@ -49,16 +52,41 @@ public class RocksDBSessionStoreSupplier<K, V> extends 
AbstractStoreSupplier<K,
     }
 
     public SessionStore<K, V> get() {
-        final RocksDBSegmentedBytesStore bytesStore = new 
RocksDBSegmentedBytesStore(name,
+        final SessionKeySchema keySchema = new SessionKeySchema();
+        final RocksDBSegmentedBytesStore segmented = new 
RocksDBSegmentedBytesStore(name,
                                                                                
      retentionPeriod,
                                                                                
      NUM_SEGMENTS,
-                                                                               
      new SessionKeySchema());
-        final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(logged ? new 
ChangeLoggingSegmentedBytesStore(bytesStore)
-                                                                               
           : bytesStore, "rocksdb-session-store", time);
-        if (enableCaching) {
-            return new CachingSessionStore<>(metered, keySerde, valueSerde);
+                                                                               
      keySchema
+        );
+
+        if (cached && logged) {
+            final ChangeLoggingSegmentedBytesStore logged = new 
ChangeLoggingSegmentedBytesStore(segmented);
+            final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(logged,
+                                                                               
       METRIC_SCOPE, time);
+            final RocksDBSessionStore<Bytes, byte[]> sessionStore
+                    = new RocksDBSessionStore<>(metered, Serdes.Bytes(), 
Serdes.ByteArray());
+
+            return new CachingSessionStore<>(sessionStore, keySerde, 
valueSerde);
+        }
+
+        if (cached) {
+            final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(segmented,
+                                                                               
       METRIC_SCOPE, time);
+            final RocksDBSessionStore<Bytes, byte[]> sessionStore
+                    = new RocksDBSessionStore<>(metered, Serdes.Bytes(), 
Serdes.ByteArray());
+
+            return new CachingSessionStore<>(sessionStore, keySerde, 
valueSerde);
         }
-        return new RocksDBSessionStore<>(metered, keySerde, valueSerde);
+
+        if (logged) {
+            final ChangeLoggingSegmentedBytesStore logged = new 
ChangeLoggingSegmentedBytesStore(segmented);
+            final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(logged,
+                                                                               
       METRIC_SCOPE, time);
+            return new RocksDBSessionStore<>(metered, keySerde, valueSerde);
+        }
+
+        return new RocksDBSessionStore<>(
+                new MeteredSegmentedBytesStore(segmented, METRIC_SCOPE, time), 
keySerde, valueSerde);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8b838d0..3f8d509 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -92,24 +92,20 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     private WriteOptions wOptions;
     private FlushOptions fOptions;
 
-    private boolean loggingEnabled = false;
-
-    private StoreChangeLogger<Bytes, byte[]> changeLogger;
-
     protected volatile boolean open = false;
 
-    public KeyValueStore<K, V> enableLogging() {
-        loggingEnabled = true;
-
-        return this;
-    }
 
-    public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
+    public RocksDBStore(final String name,
+                        final Serde<K> keySerde,
+                        final Serde<V> valueSerde) {
         this(name, DB_FILE_DIR, keySerde, valueSerde);
     }
 
 
-    public RocksDBStore(String name, String parentDir, Serde<K> keySerde, 
Serde<V> valueSerde) {
+    public RocksDBStore(final String name,
+                        final String parentDir,
+                        final Serde<K> keySerde,
+                        final Serde<V> valueSerde) {
         this.name = name;
         this.parentDir = parentDir;
         this.keySerde = keySerde;
@@ -159,10 +155,9 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         // open the DB dir
         openDB(context);
 
-        this.changeLogger = this.loggingEnabled ? new 
StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        context.register(root, loggingEnabled, new StateRestoreCallback() {
+        context.register(root, false, new StateRestoreCallback() {
 
             @Override
             public void restore(byte[] key, byte[] value) {
@@ -235,10 +230,6 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         byte[] rawKey = serdes.rawKey(key);
         byte[] rawValue = serdes.rawValue(value);
         putInternal(rawKey, rawValue);
-
-        if (loggingEnabled) {
-            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
-        }
     }
 
     @Override
@@ -278,9 +269,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
                 } else {
                     final byte[] value = serdes.rawValue(entry.value);
                     batch.put(rawKey, value);
-                    if (loggingEnabled) {
-                        changeLogger.logChange(Bytes.wrap(rawKey), value);
-                    }
                 }
             }
             db.write(wOptions, batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index a2a420e..80c4796 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -33,7 +34,6 @@ import java.util.NoSuchElementException;
 
 class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
-    private final String name;
     private final SegmentedBytesStore bytesStore;
     private final boolean retainDuplicates;
     private final Serde<K> keySerde;
@@ -43,8 +43,15 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private StateSerdes<K, V> serdes;
 
 
-    RocksDBWindowStore(String name, boolean retainDuplicates, Serde<K> 
keySerde, Serde<V> valueSerde, final SegmentedBytesStore bytesStore) {
-        this.name = name;
+    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final 
SegmentedBytesStore inner, final boolean retainDuplicates) {
+        return new RocksDBWindowStore<>(inner, Serdes.Bytes(), 
Serdes.ByteArray(), retainDuplicates);
+    }
+
+
+    RocksDBWindowStore(final SegmentedBytesStore bytesStore,
+                       final Serde<K> keySerde,
+                       final Serde<V> valueSerde,
+                       final boolean retainDuplicates) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.retainDuplicates = retainDuplicates;
@@ -54,7 +61,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public String name() {
-        return name;
+        return bytesStore.name();
     }
 
     @Override
@@ -62,7 +69,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         // construct the serde
-        this.serdes = new StateSerdes<>(name,
+        this.serdes = new StateSerdes<>(bytesStore.name(),
                                         keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
 
@@ -147,6 +154,14 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, 
V> {
         public void close() {
             actual.close();
         }
+
+        @Override
+        public Long peekNextKey() {
+            if (!actual.hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return 
WindowStoreUtils.timestampFromBinaryKey(actual.peekNextKey().get());
+        }
     }
 
 }

Reply via email to