Repository: kafka
Updated Branches:
  refs/heads/0.10.2 c9b9acf6a -> 6f72a5a53


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 84f1734..abaaffd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -58,23 +59,35 @@ public class RocksDBWindowStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V
     }
 
     public WindowStore get() {
-        final RocksDBSegmentedBytesStore bytesStore = new 
RocksDBSegmentedBytesStore(name, retentionPeriod, numSegments, new 
WindowStoreKeySchema());
-        if (!enableCaching) {
-            final RocksDBWindowStore<K, V> segmentedStore = new 
RocksDBWindowStore<>(name, retainDuplicates, keySerde, valueSerde,
-                                                                               
      logged ? new ChangeLoggingSegmentedBytesStore(bytesStore)
-                                                                               
                    : bytesStore);
-            return new MeteredWindowStore<>(segmentedStore, "rocksdb-window", 
time);
-        }
+        return maybeWrapCaching(
+                maybeWrapLogged(
+                        new RocksDBSegmentedBytesStore(
+                                name,
+                                retentionPeriod,
+                                numSegments,
+                                new WindowStoreKeySchema()
+                        )));
 
-        return new CachingWindowStore<>(new MeteredSegmentedBytesStore(logged 
? new ChangeLoggingSegmentedBytesStore(bytesStore)
-                                                                               
: bytesStore,
-                                                                       
"rocksdb-window",
-                                                                       time),
-                                        keySerde, valueSerde, windowSize);
     }
 
     @Override
     public long retentionPeriod() {
         return retentionPeriod;
     }
+
+    private SegmentedBytesStore maybeWrapLogged(final SegmentedBytesStore 
inner) {
+        if (!logged) {
+            return inner;
+        }
+        return new ChangeLoggingSegmentedBytesStore(inner);
+    }
+
+    private WindowStore<K, V> maybeWrapCaching(final SegmentedBytesStore 
inner) {
+        final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(inner, "rocksdb-window", time);
+        if (!enableCaching) {
+            return new RocksDBWindowStore<>(metered, keySerde, valueSerde, 
retainDuplicates);
+        }
+        final RocksDBWindowStore<Bytes, byte[]> windowed = 
RocksDBWindowStore.bytesStore(metered, retainDuplicates);
+        return new CachingWindowStore<>(windowed, keySerde, valueSerde, 
windowSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
new file mode 100644
index 0000000..d76e8a4
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.NoSuchElementException;
+
+class SerializedKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+
+    private final KeyValueIterator<Bytes, byte[]> bytesIterator;
+    private final StateSerdes<K, V> serdes;
+
+    SerializedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> 
bytesIterator,
+                               final StateSerdes<K, V> serdes) {
+
+        this.bytesIterator = bytesIterator;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public void close() {
+        bytesIterator.close();
+    }
+
+    @Override
+    public K peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        final Bytes bytes = bytesIterator.peekNextKey();
+        return serdes.keyFrom(bytes.get());
+    }
+
+    @Override
+    public boolean hasNext() {
+        return bytesIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+        return KeyValue.pair(serdes.keyFrom(next.key.get()), 
serdes.valueFrom(next.value));
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported by 
SerializedKeyValueIterator");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 1ea6bef..074cf8a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -39,7 +39,10 @@ public class WindowStoreUtils {
 
     public static <K> byte[] toBinaryKey(K key, final long timestamp, final 
int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);
+        return toBinaryKey(serializedKey, timestamp, seqnum);
+    }
 
+    static byte[] toBinaryKey(byte[] serializedKey, final long timestamp, 
final int seqnum) {
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
TIMESTAMP_SIZE + SEQNUM_SIZE);
         buf.put(serializedKey);
         buf.putLong(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
new file mode 100644
index 0000000..3d80b98
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A storage engine wrapper for utilities like logging, caching, and metering.
+ */
+interface WrappedStateStore extends StateStore {
+
+    /**
+     * Return the inner storage engine
+     *
+     * @return wrapped inner storage engine
+     */
+    StateStore inner();
+
+    abstract class AbstractWrappedStateStore implements WrappedStateStore {
+        final StateStore innerState;
+
+        AbstractWrappedStateStore(StateStore inner) {
+            this.innerState = inner;
+        }
+
+        @Override
+        public void init(ProcessorContext context, StateStore root) {
+            innerState.init(context, root);
+        }
+
+        @Override
+        public String name() {
+            return innerState.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return innerState.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return innerState.isOpen();
+        }
+
+        void validateStoreOpen() {
+            if (!innerState.isOpen()) {
+                throw new InvalidStateStoreException("Store " + 
innerState.name() + " is currently closed.");
+            }
+        }
+
+        @Override
+        public StateStore inner() {
+            if (innerState instanceof WrappedStateStore) {
+                return ((WrappedStateStore) innerState).inner();
+            }
+            return innerState;
+        }
+
+        @Override
+        public void flush() {
+            innerState.flush();
+        }
+
+        @Override
+        public void close() {
+            innerState.close();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 729e190..b6d8a97 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -40,9 +41,14 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KGroupedStreamImplTest {
@@ -337,4 +343,38 @@ public class KGroupedStreamImplTest {
     public void 
shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws 
Exception {
         groupedStream.count(SessionWindows.with(90), 
(StateStoreSupplier<SessionStore>) null);
     }
-}
+
+    @Test
+    public void shouldCountWindowed() throws Exception {
+        final List<KeyValue<Windowed<String>, Long>> results = new 
ArrayList<>();
+        groupedStream.count(
+                TimeWindows.of(500L),
+                "aggregate-by-key-windowed")
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long 
value) {
+                        results.add(KeyValue.pair(key, value));
+                    }
+                });
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, 
TestUtils.tempDirectory(), 0);
+        driver.setTime(0);
+        driver.process(TOPIC, "1", "A");
+        driver.process(TOPIC, "2", "B");
+        driver.process(TOPIC, "3", "C");
+        driver.setTime(500);
+        driver.process(TOPIC, "1", "A");
+        driver.process(TOPIC, "1", "A");
+        driver.process(TOPIC, "2", "B");
+        driver.process(TOPIC, "2", "B");
+        assertThat(results, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 
1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 
2L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 
1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 
2L)
+        )));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index c603aa0..5035f70 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -59,7 +59,8 @@ public class CachingSessionStoreTest {
     @Before
     public void setUp() throws Exception {
         underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new 
SessionKeySchema());
-        cachingStore = new CachingSessionStore<>(underlying,
+        final RocksDBSessionStore<Bytes, byte[]> sessionStore = new 
RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
+        cachingStore = new CachingSessionStore<>(sessionStore,
                                                  Serdes.String(),
                                                  Serdes.Long());
         cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 37fc9a0..1de1002 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -54,13 +54,15 @@ public class CachingWindowStoreTest {
     private String topic;
     private static final long DEFAULT_TIMESTAMP = 10L;
     private WindowStoreKeySchema keySchema;
+    private RocksDBWindowStore<Bytes, byte[]> windowStore;
 
     @Before
     public void setUp() throws Exception {
         keySchema = new WindowStoreKeySchema();
         underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, 
keySchema);
+        windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), 
Serdes.ByteArray(), false);
         cacheListener = new 
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
-        cachingStore = new CachingWindowStore<>(underlying,
+        cachingStore = new CachingWindowStore<>(windowStore,
                                                 Serdes.String(),
                                                 Serdes.String(),
                                                 WINDOW_SIZE);
@@ -72,6 +74,7 @@ public class CachingWindowStoreTest {
         cachingStore.init(context, cachingStore);
     }
 
+
     @Test
     public void shouldPutFetchFromCache() throws Exception {
         cachingStore.put("a", "a");

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
new file mode 100644
index 0000000..82fb831
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InMemoryKeyValueStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class ChangeLoggingKeyValueBytesStoreTest {
+
+    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new 
InMemoryKeyValueStore<>("kv");
+    private final ChangeLoggingKeyValueBytesStore store = new 
ChangeLoggingKeyValueBytesStore(inner);
+    private final Map sent = new HashMap<>();
+    private final Bytes hi = Bytes.wrap("hi".getBytes());
+    private final Bytes hello = Bytes.wrap("hello".getBytes());
+    private final byte[] there = "there".getBytes();
+    private final byte[] world = "world".getBytes();
+
+    @Before
+    public void before() {
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                sent.put(record.key(), record.value());
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.Long(),
+                                                                      
collector,
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        context.setTime(0);
+        store.init(context, store);
+    }
+
+    @Test
+    public void shouldWriteKeyValueBytesToInnerStoreOnPut() throws Exception {
+        store.put(hi, there);
+        assertThat(inner.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldLogChangeOnPut() throws Exception {
+        store.put(hi, there);
+        assertThat((byte[]) sent.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() throws Exception {
+        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
+                                   KeyValue.pair(hello, world)));
+        assertThat(inner.get(hi), equalTo(there));
+        assertThat(inner.get(hello), equalTo(world));
+    }
+
+    @Test
+    public void shouldLogChangesOnPutAll() throws Exception {
+        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
+                                   KeyValue.pair(hello, world)));
+        assertThat((byte[]) sent.get(hi), equalTo(there));
+        assertThat((byte[]) sent.get(hello), equalTo(world));
+    }
+
+    @Test
+    public void shouldPutNullOnDelete() throws Exception {
+        store.put(hi, there);
+        store.delete(hi);
+        assertThat(inner.get(hi), nullValue());
+    }
+
+    @Test
+    public void shouldReturnOldValueOnDelete() throws Exception {
+        store.put(hi, there);
+        assertThat(store.delete(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldLogKeyNullOnDelete() throws Exception {
+        store.put(hi, there);
+        store.delete(hi);
+        assertThat(sent.get(hi), nullValue());
+    }
+
+    @Test
+    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() throws 
Exception {
+        store.putIfAbsent(hi, there);
+        assertThat(inner.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() 
throws Exception {
+        store.put(hi, there);
+        store.putIfAbsent(hi, world);
+        assertThat(inner.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() 
throws Exception {
+        store.putIfAbsent(hi, there);
+        assertThat((byte[]) sent.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() 
throws Exception {
+        store.put(hi, there);
+        store.putIfAbsent(hi, world);
+        assertThat((byte[]) sent.get(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldReturnCurrentValueOnPutIfAbsent() throws Exception {
+        store.put(hi, there);
+        assertThat(store.putIfAbsent(hi, world), equalTo(there));
+    }
+
+    @Test
+    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() throws 
Exception {
+        assertThat(store.putIfAbsent(hi, there), is(nullValue()));
+    }
+
+    @Test
+    public void shouldReturnValueOnGetWhenExists() throws Exception {
+        store.put(hello, world);
+        assertThat(store.get(hello), equalTo(world));
+    }
+
+    @Test
+    public void shouldReturnNullOnGetWhenDoesntExist() throws Exception {
+        assertThat(store.get(hello), is(nullValue()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
new file mode 100644
index 0000000..8815c5a
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InMemoryKeyValueStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+
+public class ChangeLoggingKeyValueStoreTest {
+
+    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new 
InMemoryKeyValueStore<>("kv");
+    private final Serde<String> keySerde = Serdes.String();
+    private final Serde<String> valueSerde = Serdes.String();
+    private final ChangeLoggingKeyValueStore<String, String> store
+            = new ChangeLoggingKeyValueStore<>(inner, keySerde, valueSerde);
+    private final Map sent = new HashMap<>();
+    private final String hi = "hi";
+    private final Bytes hiBytes = Bytes.wrap(hi.getBytes());
+    private final String there = "there";
+    private final byte[] thereBytes = "there".getBytes();
+    private final String hello = "hello";
+    private final String world = "world";
+
+    @Before
+    public void before() {
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                sent.put(record.key(), record.value());
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.Long(),
+                                                                      
collector,
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        context.setTime(0);
+        store.init(context, store);
+    }
+
+    @Test
+    public void shouldWriteKeyValueBytesToInnerStoreOnPut() throws Exception {
+        store.put(hi, there);
+        assertThat(deserializedValueFromInner(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldLogChangeOnPut() throws Exception {
+        store.put(hi, there);
+        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
+    }
+
+    @Test
+    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() throws Exception {
+        store.putAll(Arrays.asList(KeyValue.pair(hello, world),
+                                   KeyValue.pair(hi, there)));
+        assertThat(deserializedValueFromInner(hello), equalTo(world));
+        assertThat(deserializedValueFromInner(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldLogChangesOnPutAll() throws Exception {
+        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
+                                   KeyValue.pair(hello, world)));
+        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
+        assertThat((byte[]) sent.get(Bytes.wrap(hello.getBytes())), 
equalTo(world.getBytes()));
+    }
+
+    @Test
+    public void shouldPutNullOnDelete() throws Exception {
+        store.put(hi, there);
+        store.delete(hi);
+        assertThat(inner.get(hiBytes), nullValue());
+    }
+
+    @Test
+    public void shouldReturnOldValueOnDelete() throws Exception {
+        store.put(hi, there);
+        assertThat(store.delete(hi), equalTo(there));
+    }
+
+    @Test
+    public void shouldReturnNullOnDeleteIfNoOldValue() throws Exception {
+        assertThat(store.delete(hi), is(nullValue()));
+    }
+
+    @Test
+    public void shouldLogKeyNullOnDelete() throws Exception {
+        store.put(hi, there);
+        store.delete(hi);
+        assertThat(sent.get(hi), nullValue());
+    }
+
+    @Test
+    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() throws 
Exception {
+        store.putIfAbsent(hi, there);
+        assertThat(inner.get(hiBytes), equalTo(thereBytes));
+    }
+
+    @Test
+    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() 
throws Exception {
+        store.put(hi, there);
+        store.putIfAbsent(hi, world);
+        assertThat(inner.get(hiBytes), equalTo(thereBytes));
+    }
+
+    @Test
+    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() 
throws Exception {
+        store.putIfAbsent(hi, there);
+        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
+    }
+
+    @Test
+    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() 
throws Exception {
+        store.put(hi, there);
+        store.putIfAbsent(hi, world);
+        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
+    }
+
+    @Test
+    public void shouldReturnCurrentValueOnPutIfAbsent() throws Exception {
+        store.put(hi, there);
+        assertThat(store.putIfAbsent(hi, world), equalTo(there));
+    }
+
+    @Test
+    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() throws 
Exception {
+        assertThat(store.putIfAbsent(hi, there), is(nullValue()));
+    }
+
+    @Test
+    public void shouldQueryRange() throws Exception {
+        store.put(hello, world);
+        store.put(hi, there);
+        store.put("zooom", "home");
+        final KeyValueIterator<String, String> range = store.range(hello, hi);
+        assertThat(range.next(), equalTo(KeyValue.pair(hello, world)));
+        assertThat(range.next(), equalTo(KeyValue.pair(hi, there)));
+        assertFalse(range.hasNext());
+    }
+
+    @Test
+    public void shouldReturnAllKeyValues() throws Exception {
+        store.put(hello, world);
+        store.put(hi, there);
+        final String zooom = "zooom";
+        final String home = "home";
+        store.put(zooom, home);
+        final KeyValueIterator<String, String> all = store.all();
+        assertThat(all.next(), equalTo(KeyValue.pair(hello, world)));
+        assertThat(all.next(), equalTo(KeyValue.pair(hi, there)));
+        assertThat(all.next(), equalTo(KeyValue.pair(zooom, home)));
+        assertFalse(all.hasNext());
+    }
+
+    @Test
+    public void shouldReturnValueOnGetWhenExists() throws Exception {
+        store.put(hello, world);
+        assertThat(store.get(hello), equalTo(world));
+    }
+
+    @Test
+    public void shouldReturnNullOnGetWhenDoesntExist() throws Exception {
+        assertThat(store.get(hello), is(nullValue()));
+    }
+
+    private String deserializedValueFromInner(final String key) {
+        return valueSerde.deserializer().deserialize("blah", 
inner.get(Bytes.wrap(key.getBytes())));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index 50845e8..0ebdd5c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +38,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
     }
 
     @Test
-    public void shouldPeekNext() throws Exception {
+    public void shouldPeekNextKey() throws Exception {
         store.put("A", "A");
         final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         assertEquals("A", peekingIterator.peekNextKey());
@@ -46,6 +47,15 @@ public class DelegatingPeekingKeyValueIteratorTest {
     }
 
     @Test
+    public void shouldPeekNext() throws Exception {
+        store.put("A", "A");
+        final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
+        assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
+        assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
+        assertTrue(peekingIterator.hasNext());
+    }
+
+    @Test
     public void shouldPeekAndIterate() throws Exception {
         final String[] kvs = {"a", "b", "c", "d", "e", "f"};
         for (String kv : kvs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
new file mode 100644
index 0000000..e7c2eb3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MergedSortedCacheSessionStoreIteratorTest {
+
+    private final String storeKey = "a";
+    private final String cacheKey = "b";
+
+    private final SessionWindow storeWindow = new SessionWindow(0, 1);
+    private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = 
Collections.singleton(
+            KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), 
storeWindow), storeKey.getBytes())).iterator();
+    private final SessionWindow cacheWindow = new SessionWindow(10, 20);
+    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = 
Collections.singleton(KeyValue.pair(
+            SessionKeySerde.toBinary(
+                    new Windowed<>(cacheKey, cacheWindow), 
Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes())))
+            .iterator();
+
+    @Test
+    public void shouldHaveNextFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, 
LRUCacheEntry>>emptyIterator());
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, 
LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new 
Windowed<>(storeKey, storeWindow), storeKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, 
LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.peekNextKey(), equalTo(new 
Windowed<>(storeKey, storeWindow)));
+    }
+
+    @Test
+    public void shouldHaveNextFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, 
byte[]>>emptyIterator(),
+                                 cacheKvs);
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, 
byte[]>>emptyIterator(), cacheKvs);
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new 
Windowed<>(cacheKey, cacheWindow), cacheKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> 
mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, 
byte[]>>emptyIterator(), cacheKvs);
+        assertThat(mergeIterator.peekNextKey(), equalTo(new 
Windowed<>(cacheKey, cacheWindow)));
+    }
+
+    @Test
+    public void shouldIterateBothStoreAndCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> iterator = 
createIterator(storeKvs, cacheKvs);
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new 
Windowed<>(storeKey, storeWindow), storeKey)));
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new 
Windowed<>(cacheKey, cacheWindow), cacheKey)));
+        assertFalse(iterator.hasNext());
+    }
+
+    private MergedSortedCacheSessionStoreIterator<String, String> 
createIterator(final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
+                                                                               
  final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs) {
+        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> 
storeIterator
+                = new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(storeKvs));
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
+                = new DelegatingPeekingKeyValueIterator<>("cache", new 
KeyValueIteratorStub<>(cacheKvs));
+        return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, 
storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
index b04f248..376fca8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -30,23 +29,25 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class MergedSortedCacheWindowStoreIteratorTest {
 
+    private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new 
ArrayList<>();
+    private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  
new MockStreamsMetrics(new Metrics()));
+    private final String namespace = "one";
+    private final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
+
     @Test
     public void shouldIterateOverValueFromBothIterators() throws Exception {
-        final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new 
MockStreamsMetrics(new Metrics()));
-        final String namespace = "one";
-        final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
         final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();
-
         for (long t = 0; t < 100; t += 20) {
             final byte[] v1Bytes = String.valueOf(t).getBytes();
-            final KeyValue<Bytes, byte[]> v1 = 
KeyValue.pair(Bytes.wrap(WindowStoreUtils.toBinaryKey("a", t, 0, stateSerdes)), 
v1Bytes);
-            storeValues.add(v1);
+            final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
+            windowStoreKvPairs.add(v1);
             expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
             final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 
0, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
@@ -56,11 +57,11 @@ public class MergedSortedCacheWindowStoreIteratorTest {
 
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, 
stateSerdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, 
stateSerdes);
-        final KeyValueIterator<Bytes, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>("name", new 
KeyValueIteratorStub<>(storeValues.iterator()));
+        final KeyValueIterator<Long, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>("name", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
 
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(namespace, binaryFrom, binaryTo);
 
-        final MergedSortedCachedWindowStoreIterator<Bytes, byte[]> iterator = 
new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new 
StateSerdes<>("name", Serdes.Bytes(), Serdes.ByteArray()));
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new 
MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new 
StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
         int index = 0;
         while (iterator.hasNext()) {
             final KeyValue<Long, byte[]> next = iterator.next();
@@ -70,4 +71,18 @@ public class MergedSortedCacheWindowStoreIteratorTest {
         }
     }
 
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
+        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, 
stateSerdes), new LRUCacheEntry("b".getBytes()));
+        byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, 
stateSerdes);
+        byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, 
stateSerdes);
+        final KeyValueIterator<Long, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>("name", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(namespace, binaryFrom, binaryTo);
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new 
MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new 
StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        assertThat(iterator.peekNextKey(), equalTo(0L));
+        iterator.next();
+        assertThat(iterator.peekNextKey(), equalTo(10L));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 2082e00..a2ce96c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -105,6 +105,11 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
 
         @Override
+        public Long peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey not supported 
in stub");
+        }
+
+        @Override
         public boolean hasNext() {
             return underlying.hasNext();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
new file mode 100644
index 0000000..3d9a56c
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RocksDBKeyValueStoreSupplierTest {
+
+    private static final String STORE_NAME = "name";
+    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final MockProcessorContext context = new MockProcessorContext(null,
+                                                                          
TestUtils.tempDirectory(),
+                                                                          
Serdes.String(),
+                                                                          
Serdes.String(),
+                                                                          new 
NoOpRecordCollector(),
+                                                                          
cache);
+    private KeyValueStore<String, String> store;
+
+    @After
+    public void close() {
+        store.close();
+    }
+
+    @Test
+    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws 
Exception {
+        store = createStore(true, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put("a", "b");
+        assertFalse(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws 
Exception {
+        store = createStore(false, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put("a", "b");
+        assertTrue(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldReturnCachedKeyValueStoreWhenCachingEnabled() throws 
Exception {
+        store = createStore(false, true);
+        store.init(context, store);
+        context.setTime(1);
+        store.put("a", "b");
+        store.put("b", "c");
+        assertThat(store, is(instanceOf(CachingKeyValueStore.class)));
+        assertThat(cache.size(), is(2L));
+    }
+
+    @Test
+    public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() throws 
Exception {
+        store = createStore(false, false);
+        assertThat(store, is(instanceOf(MeteredKeyValueStore.class)));
+    }
+
+    @Test
+    public void shouldReturnMeteredStoreWhenCachingDisabled() throws Exception 
{
+        store = createStore(true, false);
+        assertThat(store, is(instanceOf(MeteredKeyValueStore.class)));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenCached() throws Exception {
+        store = createStore(false, true);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenLogged() throws Exception {
+        store = createStore(true, false);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    private KeyValueStore<String, String> createStore(final boolean logged, 
final boolean cached) {
+        return new RocksDBKeyValueStoreSupplier<>(STORE_NAME,
+                                                  Serdes.String(),
+                                                  Serdes.String(),
+                                                  logged,
+                                                  Collections.EMPTY_MAP,
+                                                  cached).get();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
new file mode 100644
index 0000000..28196a2
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RocksDBSessionStoreSupplierTest {
+
+    private static final String STORE_NAME = "name";
+    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final MockProcessorContext context = new MockProcessorContext(null,
+                                                                          
TestUtils.tempDirectory(),
+                                                                          
Serdes.String(),
+                                                                          
Serdes.String(),
+                                                                          new 
NoOpRecordCollector(),
+                                                                          
cache);
+
+    private SessionStore<String, String> store;
+
+    @After
+    public void close() {
+        store.close();
+    }
+
+    @Test
+    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws 
Exception {
+        store = createStore(true, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
+        assertFalse(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws 
Exception {
+        store = createStore(false, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
+        assertTrue(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldReturnCachedSessionStoreWhenCachingEnabled() throws 
Exception {
+        store = createStore(false, true);
+        store.init(context, store);
+        context.setTime(1);
+        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
+        store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c");
+        assertThat(store, is(instanceOf(CachingSessionStore.class)));
+        assertThat(cache.size(), is(2L));
+    }
+
+    @Test
+    public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws 
Exception {
+        store = createStore(false, false);
+        assertThat(store, is(instanceOf(RocksDBSessionStore.class)));
+    }
+
+    @Test
+    public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception 
{
+        store = createStore(true, false);
+        assertThat(store, is(instanceOf(RocksDBSessionStore.class)));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenCached() throws Exception {
+        store = createStore(false, true);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenLogged() throws Exception {
+        store = createStore(true, false);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception 
{
+        store = createStore(false, false);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+
+
+    private SessionStore<String, String> createStore(final boolean logged, 
final boolean cached) {
+        return new RocksDBSessionStoreSupplier<>(STORE_NAME,
+                                                 10,
+                                                 Serdes.String(),
+                                                 Serdes.String(),
+                                                 logged,
+                                                 Collections.<String, 
String>emptyMap(),
+                                                 cached).get();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
new file mode 100644
index 0000000..897ec62
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RocksDBWindowStoreSupplierTest {
+
+    private static final String STORE_NAME = "name";
+    private WindowStore<String, String> store;
+    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final MockProcessorContext context = new MockProcessorContext(null,
+                                                                          
TestUtils.tempDirectory(),
+                                                                          
Serdes.String(),
+                                                                          
Serdes.String(),
+                                                                          new 
NoOpRecordCollector(),
+                                                                          
cache);
+
+    @After
+    public void close() {
+        store.close();
+    }
+
+    @Test
+    public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws 
Exception {
+        store = createStore(true, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put("a", "b");
+        assertFalse(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws 
Exception {
+        store = createStore(false, false);
+        final List<ProducerRecord> logged = new ArrayList<>();
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                logged.add(record);
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      
TestUtils.tempDirectory(),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String(),
+                                                                      
collector,
+                                                                      cache);
+        context.setTime(1);
+        store.init(context, store);
+        store.put("a", "b");
+        assertTrue(logged.isEmpty());
+    }
+
+    @Test
+    public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception 
{
+        store = createStore(false, true);
+        store.init(context, store);
+        context.setTime(1);
+        store.put("a", "b");
+        store.put("b", "c");
+        assertThat(store, is(instanceOf(CachingWindowStore.class)));
+        assertThat(context.getCache().size(), is(2L));
+    }
+
+    @Test
+    public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws 
Exception {
+        store = createStore(false, false);
+        assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
+    }
+
+    @Test
+    public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception 
{
+        store = createStore(true, false);
+        assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenCached() throws Exception {
+        store = createStore(false, true);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenLogged() throws Exception {
+        store = createStore(true, false);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception 
{
+        store = createStore(false, false);
+        store.init(context, store);
+        final StreamsMetrics metrics = context.metrics();
+        assertFalse(metrics.metrics().isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    private WindowStore<String, String> createStore(final boolean logged, 
final boolean cached) {
+        return new RocksDBWindowStoreSupplier<>(STORE_NAME,
+                                                10,
+                                                3,
+                                                false,
+                                                Serdes.String(),
+                                                Serdes.String(),
+                                                10,
+                                                logged,
+                                                Collections.<String, 
String>emptyMap(),
+                                                cached).get();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index a522592..79223de 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -490,8 +490,6 @@ public class RocksDBWindowStoreTest {
                     recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
             try {
                 // to validate segments
                 final Segments segments = new Segments(windowName, 
retentionPeriod, numSegments);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
new file mode 100644
index 0000000..8c0d2fe
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SerializedKeyValueIteratorTest {
+
+    private final StateSerdes<String, String> serdes = new 
StateSerdes<>("blah", Serdes.String(), Serdes.String());
+    private final Iterator<KeyValue<Bytes, byte[]>> iterator
+            = Arrays.asList(KeyValue.pair(Bytes.wrap("hi".getBytes()), 
"there".getBytes()),
+                            KeyValue.pair(Bytes.wrap("hello".getBytes()), 
"world".getBytes()))
+            .iterator();
+    private final DelegatingPeekingKeyValueIterator<Bytes, byte[]> peeking
+            = new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(iterator));
+    private final SerializedKeyValueIterator<String, String> 
serializedKeyValueIterator
+            = new SerializedKeyValueIterator<>(peeking, serdes);
+
+    @Test
+    public void shouldReturnTrueOnHasNextWhenMoreResults() {
+        assertTrue(serializedKeyValueIterator.hasNext());
+    }
+
+    @Test
+    public void shouldReturnNextValueWhenItExists() throws Exception {
+        assertThat(serializedKeyValueIterator.next(), 
equalTo(KeyValue.pair("hi", "there")));
+        assertThat(serializedKeyValueIterator.next(), 
equalTo(KeyValue.pair("hello", "world")));
+    }
+
+    @Test
+    public void shouldReturnFalseOnHasNextWhenNoMoreResults() throws Exception 
{
+        advanceIteratorToEnd();
+        assertFalse(serializedKeyValueIterator.hasNext());
+    }
+
+    @Test
+    public void shouldThrowNoSuchElementOnNextWhenIteratorExhausted() throws 
Exception {
+        advanceIteratorToEnd();
+        try {
+            serializedKeyValueIterator.next();
+            fail("Expected NoSuchElementException on exhausted iterator");
+        } catch (final NoSuchElementException nse) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hi"));
+        serializedKeyValueIterator.next();
+        assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hello"));
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowUnsupportedOperationOnRemove() throws Exception {
+        serializedKeyValueIterator.remove();
+    }
+
+    private void advanceIteratorToEnd() {
+        serializedKeyValueIterator.next();
+        serializedKeyValueIterator.next();
+    }
+
+
+}
\ No newline at end of file

Reply via email to