This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04e2061  KAFKA-3522: Add TimestampedWindowStore builder/runtime 
classes (#6173)
04e2061 is described below

commit 04e206154ac614b7d4d34a7a1b6ba2c882f607b9
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 09:30:00 2019 -0800

    KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (#6173)
    
    Add TimestampedWindowStore builder/runtime classes
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax 
<mj...@apache.org>,  John Roesler <j...@confluent.io>,  Bill Bejeck 
<bbej...@gmail.com>
---
 .../internals/InternalTopologyBuilder.java         |   7 +-
 .../processor/internals/ProcessorContextImpl.java  |  31 +++++-
 .../state/internals/CachingWindowStore.java        |  47 +++++----
 ...ChangeLoggingTimestampedKeyValueBytesStore.java |   1 +
 ... ChangeLoggingTimestampedWindowBytesStore.java} |  12 ++-
 .../internals/ChangeLoggingWindowBytesStore.java   |  48 ++++++---
 .../internals/MeteredTimestampedWindowStore.java   |  58 +++++++++++
 .../state/internals/MeteredWindowStore.java        |  20 ++--
 .../internals/TimestampedWindowStoreBuilder.java   |  74 ++++++++++++++
 .../internals/ProcessorContextImplTest.java        | 112 +++++++++++++++++----
 ...ngeLoggingTimestampedWindowBytesStoreTest.java} |  61 +++++++----
 .../ChangeLoggingWindowBytesStoreTest.java         |  31 +++---
 .../internals/MeteredTimestampWindowStoreTest.java |  92 +++++++++++++++++
 13 files changed, 486 insertions(+), 108 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0648fec..334adce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,6 +141,8 @@ public class InternalTopologyBuilder {
         long retentionPeriod() {
             if (builder instanceof WindowStoreBuilder) {
                 return ((WindowStoreBuilder) builder).retentionPeriod();
+            } else if (builder instanceof TimestampedWindowStoreBuilder) {
+                return ((TimestampedWindowStoreBuilder) 
builder).retentionPeriod();
             } else if (builder instanceof SessionStoreBuilder) {
                 return ((SessionStoreBuilder) builder).retentionPeriod();
             } else {
@@ -160,7 +163,9 @@ public class InternalTopologyBuilder {
         }
 
         private boolean isWindowStore() {
-            return builder instanceof WindowStoreBuilder || builder instanceof 
SessionStoreBuilder;
+            return builder instanceof WindowStoreBuilder
+                || builder instanceof TimestampedWindowStoreBuilder
+                || builder instanceof SessionStoreBuilder;
         }
 
         // Apparently Java strips the generics from this method because we're 
using the raw type for builder,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 36a3750..764d50c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -32,6 +32,8 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -84,6 +86,8 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         if (global != null) {
             if (global instanceof KeyValueStore) {
                 return new KeyValueStoreReadOnlyDecorator((KeyValueStore) 
global);
+            } else if (global instanceof TimestampedWindowStore) {
+                return new 
TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global);
             } else if (global instanceof WindowStore) {
                 return new WindowStoreReadOnlyDecorator((WindowStore) global);
             } else if (global instanceof SessionStore) {
@@ -106,6 +110,8 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         final StateStore store = stateManager.getStore(name);
         if (store instanceof KeyValueStore) {
             return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+        } else if (store instanceof TimestampedWindowStore) {
+            return new 
TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
         } else if (store instanceof WindowStore) {
             return new WindowStoreReadWriteDecorator((WindowStore) store);
         } else if (store instanceof SessionStore) {
@@ -339,6 +345,15 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedWindowStoreReadOnlyDecorator<K, V>
+        extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedWindowStore<K, V> {
+
+        private TimestampedWindowStoreReadOnlyDecorator(final 
TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
     private static class SessionStoreReadOnlyDecorator<K, AGG>
         extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
@@ -520,6 +535,15 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedWindowStoreReadWriteDecorator<K, V>
+        extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedWindowStore<K, V> {
+
+        TimestampedWindowStoreReadWriteDecorator(final 
TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
     static class SessionStoreReadWriteDecorator<K, AGG>
         extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
@@ -549,12 +573,15 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         }
 
         @Override
-        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+        public void put(final Windowed<K> sessionKey,
+                        final AGG aggregate) {
             wrapped().put(sessionKey, aggregate);
         }
 
         @Override
-        public AGG fetchSession(final K key, final long startTime, final long 
endTime) {
+        public AGG fetchSession(final K key,
+                                final long startTime,
+                                final long endTime) {
             return wrapped().fetchSession(key, startTime, endTime);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 50a2c7c..0a869da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -64,9 +64,10 @@ class CachingWindowStore
         this.context = context;
         final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
 
-        bytesSerdes = new StateSerdes<>(topic,
-                                        Serdes.Bytes(),
-                                        Serdes.ByteArray());
+        bytesSerdes = new StateSerdes<>(
+            topic,
+            Serdes.Bytes(),
+            Serdes.ByteArray());
         name = context.taskId() + "-" + name();
         cache = this.context.getCache();
 
@@ -121,12 +122,15 @@ class CachingWindowStore
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value) {
+    public synchronized void put(final Bytes key,
+                                 final byte[] value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value, final 
long windowStartTimestamp) {
+    public synchronized void put(final Bytes key,
+                                 final byte[] value,
+                                 final long windowStartTimestamp) {
         // since this function may not access the underlying inner store, we 
need to validate
         // if store is open outside as well.
         validateStoreOpen();
@@ -145,7 +149,8 @@ class CachingWindowStore
     }
 
     @Override
-    public byte[] fetch(final Bytes key, final long timestamp) {
+    public byte[] fetch(final Bytes key,
+                        final long timestamp) {
         validateStoreOpen();
         final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, 
timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
@@ -162,7 +167,9 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, 
final long timeFrom, final long timeTo) {
+    public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                                          final long timeFrom,
+                                                          final long timeTo) {
         // since this function may not access the underlying inner store, we 
need to validate
         // if store is open outside as well.
         validateStoreOpen();
@@ -175,10 +182,7 @@ class CachingWindowStore
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);
 
-        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key,
-                                                                             
key,
-                                                                             
timeFrom,
-                                                                             
timeTo);
+        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key, key, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(
             cacheIterator, hasNextCondition, cacheFunction
         );
@@ -188,12 +192,16 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, 
final Bytes to, final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
         // since this function may not access the underlying inner store, we 
need to validate
         // if store is open outside as well.
         validateStoreOpen();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().fetch(from, to, timeFrom, timeTo);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
+            wrapped().fetch(from, to, timeFrom, timeTo);
         if (cache == null) {
             return underlyingIterator;
         }
@@ -201,10 +209,7 @@ class CachingWindowStore
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);
 
-        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(from,
-                                                                             
to,
-                                                                             
timeFrom,
-                                                                             
timeTo);
+        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(from, to, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(cacheIterator, 
hasNextCondition, cacheFunction);
 
         return new MergedSortedCacheWindowStoreKeyValueIterator(
@@ -218,16 +223,16 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom,
+                                                              final long 
timeTo) {
         validateStoreOpen();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().fetchAll(timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.all(name);
 
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(null, null, timeFrom, timeTo);
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(cacheIterator,
-                                                                               
                               hasNextCondition,
-                                                                               
                               cacheFunction);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
         return new MergedSortedCacheWindowStoreKeyValueIterator(
                 filteredCacheIterator,
                 underlyingIterator,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 02568b6..02e4c6a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -23,6 +23,7 @@ import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial
 import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
 
 public class ChangeLoggingTimestampedKeyValueBytesStore extends 
ChangeLoggingKeyValueBytesStore {
+
     ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, 
byte[]> inner) {
         super(inner);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
similarity index 79%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 02568b6..94362d4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -17,14 +17,16 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
 import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
 
-public class ChangeLoggingTimestampedKeyValueBytesStore extends 
ChangeLoggingKeyValueBytesStore {
-    ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, 
byte[]> inner) {
-        super(inner);
+class ChangeLoggingTimestampedWindowBytesStore extends 
ChangeLoggingWindowBytesStore {
+
+    ChangeLoggingTimestampedWindowBytesStore(final WindowStore<Bytes, byte[]> 
bytesStore,
+                                             final boolean retainDuplicates) {
+        super(bytesStore, retainDuplicates);
     }
 
     @Override
@@ -36,4 +38,4 @@ public class ChangeLoggingTimestampedKeyValueBytesStore 
extends ChangeLoggingKey
             changeLogger.logChange(key, null);
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 7f7612e..ef5a4c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -36,10 +36,11 @@ class ChangeLoggingWindowBytesStore
     implements WindowStore<Bytes, byte[]> {
 
     private final boolean retainDuplicates;
-    private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private ProcessorContext context;
     private int seqnum = 0;
 
+    StoreChangeLogger<Bytes, byte[]> changeLogger;
+
     ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
                                   final boolean retainDuplicates) {
         super(bytesStore);
@@ -47,19 +48,37 @@ class ChangeLoggingWindowBytesStore
     }
 
     @Override
-    public byte[] fetch(final Bytes key, final long timestamp) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        this.context = context;
+        super.init(context, root);
+        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
+        changeLogger = new StoreChangeLogger<>(
+            name(),
+            context,
+            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key,
+                        final long timestamp) {
         return wrapped().fetch(key, timestamp);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, 
final long to) {
+    public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                             final long from,
+                                             final long to) {
         return wrapped().fetch(key, from, to);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo, final long from, final long to) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
+                                                           final Bytes keyTo,
+                                                           final long from,
+                                                           final long to) {
         return wrapped().fetch(keyFrom, keyTo, from, to);
     }
 
@@ -70,7 +89,8 @@ class ChangeLoggingWindowBytesStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom,
+                                                              final long 
timeTo) {
         return wrapped().fetchAll(timeFrom, timeTo);
     }
 
@@ -84,20 +104,16 @@ class ChangeLoggingWindowBytesStore
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value, final long 
windowStartTimestamp) {
+    public void put(final Bytes key,
+                    final byte[] value,
+                    final long windowStartTimestamp) {
         wrapped().put(key, value, windowStartTimestamp);
-        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
+        log(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 
maybeUpdateSeqnumForDups()), value);
     }
 
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        this.context = context;
-        super.init(context, root);
-        final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
-        changeLogger = new StoreChangeLogger<>(
-            name(),
-            context,
-            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+    void log(final Bytes key,
+             final byte[] value) {
+        changeLogger.logChange(key, value);
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
new file mode 100644
index 0000000..1c10491
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for 
recording operation metrics, and hence its
+ * inner WindowStore implementation do not need to provide its own metrics 
collecting functionality.
+ * The inner {@link WindowStore} of this class is of type 
&lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;
+ * @param <K>
+ * @param <V>
+ */
+class MeteredTimestampedWindowStore<K, V>
+    extends MeteredWindowStore<K, ValueAndTimestamp<V>>
+    implements TimestampedWindowStore<K, V> {
+
+    MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner,
+                                  final long windowSizeMs,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> 
valueSerde) {
+        super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    void initStoreSerde(final ProcessorContext context) {
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.keySerde()) : valueSerde);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index c040e67..681b210 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -43,13 +43,13 @@ public class MeteredWindowStore<K, V>
     private final long windowSizeMs;
     private final String metricScope;
     private final Time time;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
+    final Serde<K> keySerde;
+    final Serde<V> valueSerde;
+    StateSerdes<K, V> serdes;
     private StreamsMetricsImpl metrics;
     private Sensor putTime;
     private Sensor fetchTime;
     private Sensor flushTime;
-    private StateSerdes<K, V> serdes;
     private ProcessorContext context;
     private String taskName;
 
@@ -67,15 +67,11 @@ public class MeteredWindowStore<K, V>
         this.valueSerde = valueSerde;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         this.context = context;
-        serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        initStoreSerde(context);
         metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
@@ -102,6 +98,14 @@ public class MeteredWindowStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
+    void initStoreSerde(final ProcessorContext context) {
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> 
listener,
                                     final boolean sendOldValues) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
new file mode 100644
index 0000000..dcb0d44
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Objects;
+
+public class TimestampedWindowStoreBuilder<K, V>
+    extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, 
TimestampedWindowStore<K, V>> {
+
+    private final WindowBytesStoreSupplier storeSupplier;
+
+    public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier 
storeSupplier,
+                                         final Serde<K> keySerde,
+                                         final Serde<V> valueSerde,
+                                         final Time time) {
+        super(storeSupplier.name(), keySerde, new 
ValueAndTimestampSerde<>(valueSerde), time);
+        Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be 
null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public TimestampedWindowStore<K, V> build() {
+        return new MeteredTimestampedWindowStore<>(
+            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            storeSupplier.windowSize(),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde);
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapCaching(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingWindowStore(
+            inner,
+            storeSupplier.windowSize(),
+            storeSupplier.segmentIntervalMs());
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapLogging(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingTimestampedWindowBytesStore(inner, 
storeSupplier.retainDuplicates());
+    }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index b29b04c..9b36ec7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -27,6 +27,8 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -55,22 +57,24 @@ public class ProcessorContextImplTest {
     private ProcessorContextImpl context;
 
     private static final String KEY = "key";
-    private static final long VAL = 42L;
+    private static final long VALUE = 42L;
+    private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = 
ValueAndTimestamp.make(42L, 21L);
     private static final String STORE_NAME = "underlying-store";
 
     private boolean flushExecuted;
     private boolean putExecuted;
+    private boolean putWithTimestampExecuted;
     private boolean putIfAbsentExecuted;
     private boolean putAllExecuted;
     private boolean deleteExecuted;
     private boolean removeExecuted;
-    private boolean put3argExecuted;
 
     private KeyValueIterator<String, Long> rangeIter;
     private KeyValueIterator<String, Long> allIter;
 
-    private List<KeyValueIterator<Windowed<String>, Long>> iters = new 
ArrayList<>(7);
-    private WindowStoreIterator<Long> windowStoreIter;
+    private final List<KeyValueIterator<Windowed<String>, Long>> iters = new 
ArrayList<>(7);
+    private final List<KeyValueIterator<Windowed<String>, 
ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7);
+    private WindowStoreIterator windowStoreIter;
 
     @Before
     public void setup() {
@@ -80,7 +84,6 @@ public class ProcessorContextImplTest {
         putAllExecuted = false;
         deleteExecuted = false;
         removeExecuted = false;
-        put3argExecuted = false;
 
         rangeIter = mock(KeyValueIterator.class);
         allIter = mock(KeyValueIterator.class);
@@ -88,6 +91,7 @@ public class ProcessorContextImplTest {
 
         for (int i = 0; i < 7; i++) {
             iters.add(i, mock(KeyValueIterator.class));
+            timestampedIters.add(i, mock(KeyValueIterator.class));
         }
 
         final StreamsConfig streamsConfig = mock(StreamsConfig.class);
@@ -100,11 +104,13 @@ public class ProcessorContextImplTest {
 
         
expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
         
expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
+        
expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         
expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
         expect(stateManager.getGlobalStore(anyString())).andReturn(null);
 
         
expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
         
expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
+        
expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         
expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
 
         replay(stateManager);
@@ -120,7 +126,11 @@ public class ProcessorContextImplTest {
         );
 
         context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
-            new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", 
"LocalSessionStore"))));
+            new HashSet<>(asList(
+                "LocalKeyValueStore",
+                "LocalWindowStore",
+                "LocalTimestampedWindowStore",
+                "LocalSessionStore"))));
     }
 
     @Test
@@ -134,10 +144,10 @@ public class ProcessorContextImplTest {
             checkThrowsUnsupportedOperation(() -> 
store.putAll(Collections.emptyList()), "putAll()");
             checkThrowsUnsupportedOperation(() -> store.delete("1"), 
"delete()");
 
-            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals((Long) VALUE, store.get(KEY));
             assertEquals(rangeIter, store.range("one", "two"));
             assertEquals(allIter, store.all());
-            assertEquals(VAL, store.approximateNumEntries());
+            assertEquals(VALUE, store.approximateNumEntries());
         });
     }
 
@@ -153,12 +163,29 @@ public class ProcessorContextImplTest {
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
             assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
-            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals((Long) VALUE, store.fetch(KEY, 1L));
             assertEquals(iters.get(2), store.all());
         });
     }
 
     @Test
+    public void globalTimestampedWindowStoreShouldBeReadOnly() {
+        doTest("GlobalTimestampedWindowStore", 
(Consumer<TimestampedWindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 
ValueAndTimestamp.make(1L, 1L), 1L), "put() [with timestamp]");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 
ValueAndTimestamp.make(1L, 1L)), "put() [no timestamp]");
+
+            assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 
1L));
+            assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L));
+            assertEquals(timestampedIters.get(2), store.all());
+        });
+    }
+
+    @Test
     public void globalSessionStoreShouldBeReadOnly() {
         doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) 
store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -194,10 +221,10 @@ public class ProcessorContextImplTest {
             store.delete("1");
             assertTrue(deleteExecuted);
 
-            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals((Long) VALUE, store.get(KEY));
             assertEquals(rangeIter, store.range("one", "two"));
             assertEquals(allIter, store.all());
-            assertEquals(VAL, store.approximateNumEntries());
+            assertEquals(VALUE, store.approximateNumEntries());
         });
     }
 
@@ -212,18 +239,37 @@ public class ProcessorContextImplTest {
             store.put("1", 1L);
             assertTrue(putExecuted);
 
-            store.put("1", 1L, 1L);
-            assertTrue(put3argExecuted);
-
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
             assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
-            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals((Long) VALUE, store.fetch(KEY, 1L));
             assertEquals(iters.get(2), store.all());
         });
     }
 
     @Test
+    public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
+        doTest("LocalTimestampedWindowStore", 
(Consumer<TimestampedWindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.put("1", ValueAndTimestamp.make(1L, 1L));
+            assertTrue(putExecuted);
+
+            store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
+            assertTrue(putWithTimestampExecuted);
+
+            assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 
1L));
+            assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L));
+            assertEquals(timestampedIters.get(2), store.all());
+        });
+    }
+
+    @Test
     public void localSessionStoreShouldNotAllowInitOrClose() {
         doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) 
store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -250,8 +296,8 @@ public class ProcessorContextImplTest {
 
         initStateStoreMock(keyValueStoreMock);
 
-        expect(keyValueStoreMock.get(KEY)).andReturn(VAL);
-        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL);
+        expect(keyValueStoreMock.get(KEY)).andReturn(VALUE);
+        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VALUE);
 
         expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
         expect(keyValueStoreMock.all()).andReturn(allIter);
@@ -286,6 +332,7 @@ public class ProcessorContextImplTest {
         return keyValueStoreMock;
     }
 
+    @SuppressWarnings("unchecked")
     private WindowStore<String, Long> windowStoreMock() {
         final WindowStore<String, Long> windowStore = mock(WindowStore.class);
 
@@ -294,7 +341,7 @@ public class ProcessorContextImplTest {
         expect(windowStore.fetchAll(anyLong(), 
anyLong())).andReturn(iters.get(0));
         expect(windowStore.fetch(anyString(), anyString(), anyLong(), 
anyLong())).andReturn(iters.get(1));
         expect(windowStore.fetch(anyString(), anyLong(), 
anyLong())).andReturn(windowStoreIter);
-        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
+        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE);
         expect(windowStore.all()).andReturn(iters.get(2));
 
         windowStore.put(anyString(), anyLong());
@@ -303,9 +350,32 @@ public class ProcessorContextImplTest {
             return null;
         });
 
-        windowStore.put(anyString(), anyLong(), anyLong());
+        replay(windowStore);
+
+        return windowStore;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() {
+        final TimestampedWindowStore<String, Long> windowStore = 
mock(TimestampedWindowStore.class);
+
+        initStateStoreMock(windowStore);
+
+        expect(windowStore.fetchAll(anyLong(), 
anyLong())).andReturn(timestampedIters.get(0));
+        expect(windowStore.fetch(anyString(), anyString(), anyLong(), 
anyLong())).andReturn(timestampedIters.get(1));
+        expect(windowStore.fetch(anyString(), anyLong(), 
anyLong())).andReturn(windowStoreIter);
+        expect(windowStore.fetch(anyString(), 
anyLong())).andReturn(VALUE_AND_TIMESTAMP);
+        expect(windowStore.all()).andReturn(timestampedIters.get(2));
+
+        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class));
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), 
anyLong());
         expectLastCall().andAnswer(() -> {
-            put3argExecuted = true;
+            putWithTimestampExecuted = true;
             return null;
         });
 
@@ -360,9 +430,7 @@ public class ProcessorContextImplTest {
             @SuppressWarnings("unchecked")
             public void init(final ProcessorContext context) {
                 final T store = (T) context.getStateStore(name);
-
                 checker.accept(store);
-
             }
 
             @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
similarity index 63%
copy from 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
copy to 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index a36b101..edf210e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.easymock.EasyMock;
@@ -38,12 +38,13 @@ import java.util.Map;
 
 import static java.time.Instant.ofEpochMilli;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(EasyMockRunner.class)
-public class ChangeLoggingWindowBytesStoreTest {
+public class ChangeLoggingTimestampedWindowBytesStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
-    private final Map<Object, Object> sent = new HashMap<>();
+    private final Map<Object, ValueAndTimestamp<Object>> sent = new 
HashMap<>();
     private final NoOpRecordCollector collector = new NoOpRecordCollector() {
         @Override
         public <K, V> void send(final String topic,
@@ -54,23 +55,24 @@ public class ChangeLoggingWindowBytesStoreTest {
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer) {
-            sent.put(key, value);
+            sent.put(key, ValueAndTimestamp.make(value, timestamp));
         }
     };
 
-    private final byte[] value1 = {0};
-    private final Bytes bytesKey = Bytes.wrap(value1);
+    private final byte[] value = {0};
+    private final byte[] valueAndTimestamp = {0, 0, 0, 0, 0, 0, 0, 42, 0};
+    private final Bytes bytesKey = Bytes.wrap(value);
 
     @Mock(type = MockType.NICE)
     private WindowStore<Bytes, byte[]> inner;
     @Mock(type = MockType.NICE)
     private ProcessorContextImpl context;
-    private ChangeLoggingWindowBytesStore store;
+    private ChangeLoggingTimestampedWindowBytesStore store;
 
 
     @Before
     public void setUp() {
-        store = new ChangeLoggingWindowBytesStore(inner, false);
+        store = new ChangeLoggingTimestampedWindowBytesStore(inner, false);
     }
 
     private void init() {
@@ -85,20 +87,27 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldLogPuts() {
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, valueAndTimestamp, 0);
         EasyMock.expectLastCall();
 
         init();
 
-        store.put(bytesKey, value1);
+        store.put(bytesKey, valueAndTimestamp);
 
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
0)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
0)).timestamp());
         EasyMock.verify(inner);
     }
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.fetch(bytesKey, 0, 
10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, 0, 10))
+            .andReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         init();
 
@@ -108,7 +117,9 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 
1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, bytesKey, 0, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
 
         init();
 
@@ -118,16 +129,26 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldRetainDuplicatesWhenSet() {
-        store = new ChangeLoggingWindowBytesStore(inner, true);
-        inner.put(bytesKey, value1, 0);
+        store = new ChangeLoggingTimestampedWindowBytesStore(inner, true);
+        inner.put(bytesKey, valueAndTimestamp, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value1);
-        store.put(bytesKey, value1);
-
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
+        store.put(bytesKey, valueAndTimestamp);
+        store.put(bytesKey, valueAndTimestamp);
+
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
1)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
1)).timestamp());
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
2)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
2)).timestamp());
 
         EasyMock.verify(inner);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index a36b101..d7ad6d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -58,8 +57,8 @@ public class ChangeLoggingWindowBytesStoreTest {
         }
     };
 
-    private final byte[] value1 = {0};
-    private final Bytes bytesKey = Bytes.wrap(value1);
+    private final byte[] value = {0};
+    private final Bytes bytesKey = Bytes.wrap(value);
 
     @Mock(type = MockType.NICE)
     private WindowStore<Bytes, byte[]> inner;
@@ -85,20 +84,24 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldLogPuts() {
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall();
 
         init();
 
-        store.put(bytesKey, value1);
+        store.put(bytesKey, value);
 
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 
0)));
         EasyMock.verify(inner);
     }
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.fetch(bytesKey, 0, 
10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, 0, 10))
+            .andReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         init();
 
@@ -108,7 +111,9 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 
1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, bytesKey, 0, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
 
         init();
 
@@ -119,15 +124,15 @@ public class ChangeLoggingWindowBytesStoreTest {
     @Test
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value1);
-        store.put(bytesKey, value1);
+        store.put(bytesKey, value);
+        store.put(bytesKey, value);
 
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
-        assertArrayEquals(value1, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
+        assertArrayEquals(value, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
+        assertArrayEquals(value, (byte[]) 
sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
 
         EasyMock.verify(inner);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
new file mode 100644
index 0000000..a3522f3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNull;
+
+public class MeteredTimestampWindowStoreTest {
+    private InternalMockProcessorContext context;
+    @SuppressWarnings("unchecked")
+    private final WindowStore<Bytes, byte[]> innerStoreMock = 
EasyMock.createNiceMock(WindowStore.class);
+    private final MeteredTimestampedWindowStore<String, String> store = new 
MeteredTimestampedWindowStore<>(
+        innerStoreMock,
+        10L, // any size
+        "scope",
+        new MockTime(),
+        Serdes.String(),
+        new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+    );
+    private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
+
+    {
+        
EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
+    }
+
+    @Before
+    public void setUp() {
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test");
+
+        context = new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+    }
+
+    @Test
+    public void shouldCloseUnderlyingStore() {
+        innerStoreMock.close();
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerStoreMock);
+
+        store.init(context, store);
+        store.close();
+        EasyMock.verify(innerStoreMock);
+    }
+
+    @Test
+    public void shouldNotExceptionIfFetchReturnsNull() {
+        EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 
0)).andReturn(null);
+        EasyMock.replay(innerStoreMock);
+
+        store.init(context, store);
+        assertNull(store.fetch("a", 0));
+    }
+
+}

Reply via email to