Repository: samza
Updated Branches:
  refs/heads/master 072457a2e -> e94abca72


SAMZA-1424 SAMZA-1425 SAMZA-1426; Support serdes and persistent state for 
windows

Notable changes:
* Made windows durable with support for persistent recoverable stores
* New storage format to support multiple messages in windows (the previous 
storage format
of storing the entire message-list as a value in the store incurs significant 
serde overhead)
* Wire `TimeSeriesStore` with the WindowOperator implementation.

Testing:
* Existing unit tests and integration tests pass with serdes wired-up
* Will follow-up with a PR for hello-samza soon.

Note: Majority of changes are in 
`samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java`
 and github collapsed the diff.

Author: Jagadish <jvenkatra...@linkedin.com>

Reviewers: Prateek Maheshwari<pmahe...@linkedin.com>

Closes #321 from vjagadish1989/window-operator-serde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e94abca7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e94abca7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e94abca7

Branch: refs/heads/master
Commit: e94abca72f73a38780c93111f06c69f4aa4255a7
Parents: 072457a
Author: Jagadish <jvenkatra...@linkedin.com>
Authored: Fri Oct 13 13:22:56 2017 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Oct 13 13:22:56 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/windows/Window.java  |   1 -
 .../samza/operators/windows/WindowKey.java      |   3 +-
 .../apache/samza/operators/windows/Windows.java |  71 ++---
 .../windows/internal/WindowInternal.java        |  82 +++--
 .../apache/samza/operators/impl/TriggerKey.java |  40 +--
 .../operators/impl/WindowOperatorImpl.java      | 306 +++++++++++--------
 .../samza/operators/impl/WindowState.java       |  49 ---
 .../operators/impl/store/TimeSeriesKey.java     |   3 +-
 .../impl/store/TimeSeriesKeySerde.java          |  10 +-
 .../operators/impl/store/TimeSeriesStore.java   |  36 +++
 .../impl/store/TimeSeriesStoreImpl.java         |  60 ++++
 .../operators/spec/WindowOperatorSpec.java      |  19 +-
 .../samza/example/PageViewCounterExample.java   |   2 +-
 .../samza/example/RepartitionExample.java       |   2 +-
 .../org/apache/samza/example/WindowExample.java |   2 +-
 .../samza/execution/TestExecutionPlanner.java   |   4 +-
 .../samza/operators/TestMessageStreamImpl.java  |   2 +-
 .../samza/operators/TestWindowOperator.java     |  27 +-
 .../operators/spec/TestWindowOperatorSpec.java  |   9 +-
 .../samza/zk/TestScheduleAfterDebounceTime.java |   7 +-
 .../test/operator/RepartitionJoinWindowApp.java |   2 +-
 .../samza/test/operator/SessionWindowApp.java   |   3 +-
 .../samza/test/operator/TumblingWindowApp.java  |   2 +-
 23 files changed, 470 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 6f75993..1c0fa53 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -104,5 +104,4 @@ public interface Window<M, K, WV> {
    * @return the {@link Window} function with {@code mode} set as its {@link 
AccumulationMode}.
    */
   Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 6c66654..550ed1a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -66,8 +66,7 @@ public class WindowKey<K> {
     WindowKey<?> windowKey = (WindowKey<?>) o;
 
     if (key != null ? !key.equals(windowKey.key) : windowKey.key != null) 
return false;
-    return !(paneId != null ? !paneId.equals(windowKey.paneId) : 
windowKey.paneId != null);
-
+    return paneId != null ? paneId.equals(windowKey.paneId) : windowKey.paneId 
== null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index b478cfa..50391ff 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -26,9 +26,9 @@ import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -118,18 +118,20 @@ public final class Windows {
    * @param initialValue the initial value supplier for the aggregator. 
Invoked when a new window is created.
    * @param aggregator the function to incrementally update the window value. 
Invoked when a new message
    *                   arrives for the window.
+   * @param keySerde the serde for the window key
+   * @param windowValueSerde the serde for the window value
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function.
    */
-  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(
-      Function<? super M, ? extends K> keyFn, Duration interval,
-      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> 
aggregator) {
+  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? 
super M, ? extends K> keyFn, Duration interval,
+      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> 
aggregator, Serde<K> keySerde,
+      Serde<WV> windowValueSerde) {
 
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
     return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, 
(FoldLeftFunction<M, WV>) aggregator,
-        (Function<M, K>) keyFn, null, WindowType.TUMBLING);
+        (Function<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, 
windowValueSerde, null);
   }
 
 
@@ -149,16 +151,18 @@ public final class Windows {
    *
    * @param keyFn function to extract key from the message
    * @param interval the duration in processing time
+   * @param keySerde the serde for the window key
+   * @param msgSerde the serde for the input message
    * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(
-      Function<? super M, ? extends K> keyFn, Duration interval) {
-    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+  public static <M, K> Window<M, K, Collection<M>> 
keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
+      Serde<K> keySerde, Serde<M> msgSerde) {
 
-    Supplier<Collection<M>> initialValue = ArrayList::new;
-    return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
+    Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
+    return new WindowInternal<>(defaultTrigger, null, null, keyFn, null,
+        WindowType.TUMBLING, keySerde, null, msgSerde);
   }
 
   /**
@@ -180,18 +184,20 @@ public final class Windows {
    * @param initialValue the initial value supplier for the aggregator. 
Invoked when a new window is created.
    * @param aggregator the function to incrementally update the window value. 
Invoked when a new message
    *                   arrives for the window.
+   * @param windowValueSerde the serde for the window value
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @return the created {@link Window} function
    */
   public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, 
Supplier<? extends WV> initialValue,
-      FoldLeftFunction<? super M, WV> aggregator) {
+      FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) {
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
     return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, 
(FoldLeftFunction<M, WV>) aggregator,
-        null, null, WindowType.TUMBLING);
+        null, null, WindowType.TUMBLING, null, windowValueSerde, null);
   }
 
   /**
+   *
    * Creates a {@link Window} that groups incoming messages into fixed-size, 
non-overlapping
    * processing time based windows.
    *
@@ -209,14 +215,15 @@ public final class Windows {
    * </pre>
    *
    * @param duration the duration in processing time
+   * @param msgSerde the serde for the input message
    * @param <M> the type of the input message
    * @return the created {@link Window} function
    */
-  public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration 
duration) {
-    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+  public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration 
duration, Serde<M> msgSerde) {
+    Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
 
-    Supplier<Collection<M>> initialValue = ArrayList::new;
-    return tumblingWindow(duration, initialValue, aggregator);
+    return new WindowInternal<>(defaultTrigger, null, null, null,
+       null, WindowType.TUMBLING, null, null, msgSerde);
   }
 
   /**
@@ -244,17 +251,19 @@ public final class Windows {
    * @param initialValue the initial value supplier for the aggregator. 
Invoked when a new window is created.
    * @param aggregator the function to incrementally update the window value. 
Invoked when a new message
    *                   arrives for the window.
+   * @param keySerde the serde for the window key
+   * @param windowValueSerde the serde for the window value
    * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(
-      Function<? super M, ? extends K> keyFn, Duration sessionGap,
-      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> 
aggregator) {
+  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? 
super M, ? extends K> keyFn,
+      Duration sessionGap, Supplier<? extends WV> initialValue, 
FoldLeftFunction<? super M, WV> aggregator,
+      Serde<K> keySerde, Serde<WV> windowValueSerde) {
     Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
     return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, 
(FoldLeftFunction<M, WV>) aggregator,
-        (Function<M, K>) keyFn, null, WindowType.SESSION);
+        (Function<M, K>) keyFn, null, WindowType.SESSION, keySerde, 
windowValueSerde, null);
   }
 
   /**
@@ -279,25 +288,17 @@ public final class Windows {
    *
    * @param keyFn the function to extract the window key from a message}
    * @param sessionGap the timeout gap for defining the session
+   * @param keySerde the serde for the window key
+   * @param msgSerde the serde for the input message
    * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(
-      Function<? super M, ? extends K> keyFn, Duration sessionGap) {
-
-    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
-
-    Supplier<Collection<M>> initialValue = ArrayList::new;
-    return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
-  }
-
+  public static <M, K> Window<M, K, Collection<M>> 
keyedSessionWindow(Function<? super M, ? extends K> keyFn,
+      Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) {
 
-  private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() {
-    return (m, c) -> {
-      c.add(m);
-      return c;
-    };
+    Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
+    return new WindowInternal<>(defaultTrigger, null, null, (Function<M, K>) 
keyFn,
+        null, WindowType.SESSION, keySerde, null, msgSerde);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
 
b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index f6ac301..bc71872 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -22,6 +22,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.Window;
+import org.apache.samza.serializers.Serde;
 
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -66,37 +67,46 @@ public final class WindowInternal<M, WK, WV> implements 
Window<M, WK, WV> {
    */
   private final WindowType windowType;
 
-  private Trigger<M> earlyTrigger;
+  private final Serde<WK> keySerde;
+  private final Serde<WV> windowValSerde;
+  private final Serde<M> msgSerde;
 
+  private Trigger<M> earlyTrigger;
   private Trigger<M> lateTrigger;
-
   private AccumulationMode mode;
 
-  public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initialValue, 
FoldLeftFunction<M, WV> foldLeftFunction, Function<M, WK> keyExtractor, 
Function<M, Long> eventTimeExtractor, WindowType windowType) {
+  public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initializer, 
FoldLeftFunction<M, WV> foldLeftFunction,
+      Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, 
WindowType windowType, Serde<WK> keySerde,
+      Serde<WV> windowValueSerde, Serde<M> msgSerde) {
     this.defaultTrigger = defaultTrigger;
-    this.initializer = initialValue;
+    this.initializer = initializer;
     this.foldLeftFunction = foldLeftFunction;
     this.eventTimeExtractor = eventTimeExtractor;
     this.keyExtractor = keyExtractor;
     this.windowType = windowType;
-  }
+    this.keySerde = keySerde;
+    this.windowValSerde = windowValueSerde;
+    this.msgSerde = msgSerde;
 
-  @Override
-  public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) {
-    this.earlyTrigger = trigger;
-    return this;
-  }
+    if (defaultTrigger == null) {
+      throw new IllegalArgumentException("A window must not have a null 
default trigger");
+    }
 
-  @Override
-  public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) {
-    this.lateTrigger = trigger;
-    return this;
-  }
+    if (msgSerde == null && windowValueSerde == null) {
+      throw new IllegalArgumentException("A window must not have a null msg 
serde and a null windowValue serde");
+    }
 
-  @Override
-  public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) {
-    this.mode = mode;
-    return this;
+    if (foldLeftFunction != null && windowValSerde == null) {
+      throw new IllegalArgumentException("A window with a FoldLeftFunction 
must have a windowValue serde");
+    }
+
+    if (foldLeftFunction != null && initializer == null) {
+      throw new IllegalArgumentException("A window with a FoldLeftFunction 
must have an initializer");
+    }
+
+    if (foldLeftFunction == null && initializer != null) {
+      throw new IllegalArgumentException("A window without a provided 
FoldLeftFunction must not have an initializer");
+    }
   }
 
   public Trigger<M> getDefaultTrigger() {
@@ -134,4 +144,38 @@ public final class WindowInternal<M, WK, WV> implements 
Window<M, WK, WV> {
   public AccumulationMode getAccumulationMode() {
     return mode;
   }
+
+  public Serde<WK> getKeySerde() {
+    return keySerde;
+  }
+
+  public Serde<WV> getWindowValSerde() {
+    return windowValSerde;
+  }
+
+  public Serde<M> getMsgSerde() {
+    return msgSerde;
+  }
+
+  public AccumulationMode getMode() {
+    return mode;
+  }
+
+  @Override
+  public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) {
+    this.earlyTrigger = trigger;
+    return this;
+  }
+
+  @Override
+  public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) {
+    this.lateTrigger = trigger;
+    return this;
+  }
+
+  @Override
+  public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) {
+    this.mode = mode;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
index c089737..736f167 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
@@ -20,53 +20,57 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.triggers.FiringType;
-import org.apache.samza.operators.windows.WindowKey;
 
 /**
  * Uniquely identifies a trigger firing
  */
-public class TriggerKey<WK> {
+public class TriggerKey<K> {
   private final FiringType type;
-  private final WindowKey<WK> key;
+  private final K key;
+  private final long timestamp;
 
-  public TriggerKey(FiringType type, WindowKey<WK> key) {
+  public TriggerKey(FiringType type, K key, long timestamp) {
     if (type == null) {
       throw new IllegalArgumentException("Firing type cannot be null");
     }
 
-    if (key == null) {
-      throw new IllegalArgumentException("WindowKey cannot be null");
-    }
-
     this.type = type;
     this.key = key;
+    this.timestamp = timestamp;
   }
 
-  /**
-   * Equality is determined by both the type, and the window key.
-   */
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
-    TriggerKey<WK> that = (TriggerKey<WK>) o;
-    return type == that.type && key.equals(that.key);
+
+    TriggerKey<?> that = (TriggerKey<?>) o;
+
+    if (timestamp != that.timestamp) {
+      return false;
+    }
+    if (type != that.type) {
+      return false;
+    }
+    return key != null ? key.equals(that.key) : that.key == null;
   }
 
-  /**
-   * Hashcode is computed by from the type, and the window key.
-   */
   @Override
   public int hashCode() {
     int result = type.hashCode();
-    result = 31 * result + key.hashCode();
+    result = 31 * result + (key != null ? key.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
     return result;
   }
 
-  public WindowKey<WK> getKey() {
+  public K getKey() {
     return key;
   }
 
+  public long getTimestamp() {
+    return timestamp;
+  }
+
   public FiringType getType() {
     return type;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 736d71e..42fe46a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -20,7 +20,13 @@
  */
 package org.apache.samza.operators.impl;
 
+import com.google.common.base.Preconditions;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.impl.store.TimeSeriesKey;
+import org.apache.samza.operators.impl.store.TimeSeriesStore;
+import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.triggers.FiringType;
@@ -29,12 +35,12 @@ import org.apache.samza.operators.triggers.TimeTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.TriggerImpl;
 import org.apache.samza.operators.triggers.TriggerImpls;
-import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowKey;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
@@ -45,89 +51,119 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * Implementation of a window operator that groups messages into finite 
windows for processing.
  *
  * This class implements the processing logic for various types of windows and 
triggers. It tracks and manages state for
  * all open windows, the active triggers that correspond to each of the 
windows and the pending callbacks. It provides
- * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can 
use to schedule and cancel callbacks. It
- * also orchestrates the flow of messages through the various {@link 
TriggerImpl}s.
+ * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can 
use
+ * to schedule and cancel callbacks. It also orchestrates the flow of messages 
through the various
+ * {@link TriggerImpl}s.
+ *
+ * <p> An instance of a {@link TriggerImplHandler} is created corresponding to 
each {@link Trigger}
+ * configured for a particular window. For every message added to the window, 
this class looks up the corresponding
+ * {@link TriggerImplHandler} for the trigger and invokes {@link 
TriggerImplHandler#onMessage(TriggerKey, Object,
+ * MessageCollector, TaskCoordinator)} The {@link TriggerImplHandler} 
maintains the {@link TriggerImpl} instance along
+ * with whether it has been canceled yet or not. Then, the {@link 
TriggerImplHandler} invokes onMessage on underlying
+ * its {@link TriggerImpl} instance. A {@link TriggerImpl} instance is scoped 
to a window and its firing determines when
+ * results for its window are emitted.
  *
- * <p> An instance of a {@link TriggerImplHandler} is created corresponding to 
each {@link Trigger} configured for a
- * particular window. For every message added to the window, this class looks 
up the corresponding {@link TriggerImplHandler}
- * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, 
Object, MessageCollector, TaskCoordinator)}.
- * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance 
along with whether it has been canceled yet
- * or not. Then, the {@link TriggerImplHandler} invokes onMessage on 
underlying its {@link TriggerImpl} instance. A
- * {@link TriggerImpl} instance is scoped to a window and its firing 
determines when results for its window are emitted.
  * The {@link WindowOperatorImpl} checks if the trigger fired and returns the 
result of the firing.
  *
  * @param <M> the type of the incoming message
- * @param <WK> the type of the key in this {@link 
org.apache.samza.operators.MessageStream}
- * @param <WV> the type of the value in the emitted window pane
+ * @param <K> the type of the key in the incoming message
  *
  */
-public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, 
WindowPane<WK, WV>> {
-
+public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, 
Object>> {
+  // Object == Collection<M> || WV
   private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperatorImpl.class);
 
-  private final WindowOperatorSpec<M, WK, WV> windowOpSpec;
+  private final WindowOperatorSpec<M, K, Object> windowOpSpec;
   private final Clock clock;
-  private final WindowInternal<M, WK, WV> window;
-  private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new 
InternalInMemoryStore<>();
-  private TriggerScheduler<WK> triggerScheduler;
+  private final WindowInternal<M, K, Object> window;
+  private final FoldLeftFunction<M, Object> foldLeftFn;
+  private final Supplier<Object> initializer;
+  private final Function<M, K> keyFn;
 
-  // The trigger state corresponding to each {@link TriggerKey}.
-  private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new 
HashMap<>();
+  private final TriggerScheduler<K> triggerScheduler;
+  private final Map<TriggerKey<K>, TriggerImplHandler> triggers = new 
HashMap<>();
+  private TimeSeriesStore<K, Object> timeSeriesStore;
 
-  public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOpSpec, Clock 
clock) {
+  public WindowOperatorImpl(WindowOperatorSpec<M, K, Object> windowOpSpec, 
Clock clock) {
     this.windowOpSpec = windowOpSpec;
     this.clock = clock;
     this.window = windowOpSpec.getWindow();
+    this.foldLeftFn = window.getFoldLeftFunction();
+    this.initializer = window.getInitializer();
+    this.keyFn = window.getKeyExtractor();
     this.triggerScheduler= new TriggerScheduler(clock);
   }
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
-    WindowInternal<M, WK, WV> window = windowOpSpec.getWindow();
-    if (window.getFoldLeftFunction() != null) {
-      window.getFoldLeftFunction().init(config, context);
+    WindowInternal<M, K, Object> window = windowOpSpec.getWindow();
+
+    KeyValueStore<TimeSeriesKey<K>, Object> store = 
(KeyValueStore<TimeSeriesKey<K>, Object>) 
context.getStore(windowOpSpec.getOpName());
+
+    // For aggregating windows, we use the store in over-write mode since we 
only retain the aggregated
+    // value. Else, we use the store in append-mode.
+    if (foldLeftFn != null) {
+      foldLeftFn.init(config, context);
+      timeSeriesStore = new TimeSeriesStoreImpl(store, false);
+    } else {
+      timeSeriesStore = new TimeSeriesStoreImpl(store, true);
     }
   }
 
   @Override
-  public Collection<WindowPane<WK, WV>> handleMessage(
+  public Collection<WindowPane<K, Object>> handleMessage(
       M message, MessageCollector collector, TaskCoordinator coordinator) {
     LOG.trace("Processing message envelope: {}", message);
-    List<WindowPane<WK, WV>> results = new ArrayList<>();
+    List<WindowPane<K, Object>> results = new ArrayList<>();
 
-    WindowKey<WK> storeKey =  getStoreKey(message);
-    WindowState<WV> existingState = store.get(storeKey);
-    LOG.trace("Store key ({}) has existing state ({})", storeKey, 
existingState);
-    WindowState<WV> newState = applyFoldFunction(existingState, message);
+    K key = (keyFn != null) ? keyFn.apply(message) : null;
+    long timestamp = getWindowTimestamp(message);
 
-    LOG.trace("New window value: {}, earliest timestamp: {}",
-        newState.getWindowValue(), newState.getEarliestTimestamp());
-    store.put(storeKey, newState);
+    // For aggregating windows, we only store the aggregated window value.
+    // For non-aggregating windows, we store all messages in the window.
+    if (foldLeftFn == null) {
+      timeSeriesStore.put(key, message, timestamp); // store is in append mode
+    } else {
+      List<Object> existingState = getValues(key, timestamp);
+      Preconditions.checkState(existingState.size() == 1, "WindowState for 
aggregating windows " +
+          "must not contain more than one entry per window");
+
+      Object oldVal = existingState.get(0);
+      if (oldVal == null) {
+        LOG.trace("No existing state found for key. Invoking initializer.");
+        oldVal = initializer.get();
+      }
 
-    if (window.getEarlyTrigger() != null) {
-      TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
+      Object aggregatedValue = foldLeftFn.apply(message, oldVal);
+      timeSeriesStore.put(key, aggregatedValue, timestamp);
+    }
 
+    if (window.getEarlyTrigger() != null) {
+      TriggerKey<K> triggerKey = new TriggerKey<>(FiringType.EARLY, key, 
timestamp);
       TriggerImplHandler triggerImplHandler = 
getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger());
-      Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+      Optional<WindowPane<K, Object>> maybeTriggeredPane =
           triggerImplHandler.onMessage(triggerKey, message, collector, 
coordinator);
       maybeTriggeredPane.ifPresent(results::add);
     }
 
     if (window.getDefaultTrigger() != null) {
-      TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, 
storeKey);
+      TriggerKey<K> triggerKey = new TriggerKey<>(FiringType.DEFAULT, key, 
timestamp);
       TriggerImplHandler triggerImplHandler = 
getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger());
-      Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+      Optional<WindowPane<K, Object>> maybeTriggeredPane =
           triggerImplHandler.onMessage(triggerKey, message, collector, 
coordinator);
       maybeTriggeredPane.ifPresent(results::add);
     }
@@ -136,16 +172,15 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   }
 
   @Override
-  public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector 
collector, TaskCoordinator coordinator) {
+  public Collection<WindowPane<K, Object>> handleTimer(MessageCollector 
collector, TaskCoordinator coordinator) {
     LOG.trace("Processing timer.");
-    List<WindowPane<WK, WV>> results = new ArrayList<>();
+    List<WindowPane<K, Object>> results = new ArrayList<>();
+    List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();
 
-    List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
-
-    for (TriggerKey<WK> key : keys) {
+    for (TriggerKey<K> key : keys) {
       TriggerImplHandler triggerImplHandler = triggers.get(key);
       if (triggerImplHandler != null) {
-        Optional<WindowPane<WK, WV>> maybeTriggeredPane = 
triggerImplHandler.onTimer(key, collector, coordinator);
+        Optional<WindowPane<K, Object>> maybeTriggeredPane = 
triggerImplHandler.onTimer(key, collector, coordinator);
         maybeTriggeredPane.ifPresent(results::add);
       }
     }
@@ -154,62 +189,21 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   }
 
   @Override
-  protected OperatorSpec<M, WindowPane<WK, WV>> getOperatorSpec() {
+  protected OperatorSpec<M, WindowPane<K, Object>> getOperatorSpec() {
     return windowOpSpec;
   }
 
   @Override
   protected void handleClose() {
-    WindowInternal<M, WK, WV> window = windowOpSpec.getWindow();
-
-    if (window.getFoldLeftFunction() != null) {
-      window.getFoldLeftFunction().close();
+    if (foldLeftFn != null) {
+      foldLeftFn.close();
     }
-  }
-
-  /**
-   * Get the key to be used for lookups in the store for this message.
-   */
-  private WindowKey<WK> getStoreKey(M message) {
-    Function<M, WK> keyExtractor = window.getKeyExtractor();
-    WK key = null;
-
-    if (keyExtractor != null) {
-      key = keyExtractor.apply(message);
-    }
-
-    String paneId = null;
-
-    if (window.getWindowType() == WindowType.TUMBLING) {
-      long triggerDurationMs = ((TimeTrigger<M>) 
window.getDefaultTrigger()).getDuration().toMillis();
-      final long now = clock.currentTimeMillis();
-      Long windowBoundary = now - now % triggerDurationMs;
-      paneId = windowBoundary.toString();
-    }
-
-    return new WindowKey<>(key, paneId);
-  }
-
-  private WindowState<WV> applyFoldFunction(WindowState<WV> existingState, M 
message) {
-    WV wv;
-    long earliestTimestamp;
-
-    if (existingState == null) {
-      LOG.trace("No existing state found for key. Invoking initializer.");
-      wv = window.getInitializer().get();
-      earliestTimestamp = clock.currentTimeMillis();
-    } else {
-      wv = existingState.getWindowValue();
-      earliestTimestamp = existingState.getEarliestTimestamp();
+    if (timeSeriesStore != null) {
+      timeSeriesStore.close();
     }
-
-    WV newVal = window.getFoldLeftFunction().apply(message, wv);
-    WindowState<WV> newState = new WindowState(newVal, earliestTimestamp);
-
-    return newState;
   }
 
-  private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> 
triggerKey, Trigger<M> trigger) {
+  private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<K> 
triggerKey, Trigger<M> trigger) {
     TriggerImplHandler wrapper = triggers.get(triggerKey);
     if (wrapper != null) {
       LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
@@ -218,7 +212,7 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
 
     LOG.trace("Creating a new trigger wrapper for {}", triggerKey);
 
-    TriggerImpl<M, WK> triggerImpl = TriggerImpls.createTriggerImpl(trigger, 
clock, triggerKey);
+    TriggerImpl<M, K> triggerImpl = TriggerImpls.createTriggerImpl(trigger, 
clock, triggerKey);
     wrapper = new TriggerImplHandler(triggerKey, triggerImpl);
     triggers.put(triggerKey, wrapper);
 
@@ -228,25 +222,28 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   /**
    * Handles trigger firings and returns the optional result.
    */
-  private Optional<WindowPane<WK, WV>> onTriggerFired(
-      TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator 
coordinator) {
+  private Optional<WindowPane<K, Object>> onTriggerFired(TriggerKey<K> 
triggerKey, MessageCollector collector,
+      TaskCoordinator coordinator) {
     LOG.trace("Trigger key {} fired." , triggerKey);
 
     TriggerImplHandler wrapper = triggers.get(triggerKey);
-    WindowKey<WK> windowKey = triggerKey.getKey();
-    WindowState<WV> state = store.get(windowKey);
+    long timestamp = triggerKey.getTimestamp();
+    K key = triggerKey.getKey();
+    List<Object> existingState = getValues(key, timestamp);
 
-    if (state == null) {
+    if (existingState == null || existingState.size() == 0) {
       LOG.trace("No state found for triggerKey: {}", triggerKey);
       return Optional.empty();
     }
 
-    WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state);
+    Object windowVal = window.getFoldLeftFunction() == null ? existingState : 
existingState.get(0);
 
-    // Handle accumulation modes.
+    WindowPane<K, Object> paneOutput = computePaneOutput(triggerKey, 
windowVal);
+
+    // Handle different accumulation modes.
     if (window.getAccumulationMode() == AccumulationMode.DISCARDING) {
       LOG.trace("Clearing state for trigger key: {}", triggerKey);
-      store.put(windowKey, null);
+      timeSeriesStore.remove(key, timestamp);
     }
 
     // Cancel all early triggers too when the default trigger fires. Also, 
clean all state for the key.
@@ -256,9 +253,8 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
       LOG.trace("Default trigger fired. Canceling triggers for {}", 
triggerKey);
 
       cancelTrigger(triggerKey, true);
-      cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), 
true);
-      WindowKey<WK> key = triggerKey.getKey();
-      store.delete(key);
+      cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey(), 
triggerKey.getTimestamp()), true);
+      timeSeriesStore.remove(key, timestamp);
     }
 
     // Cancel non-repeating early triggers. All early triggers should be 
removed from the "triggers" map only after the
@@ -276,22 +272,9 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   /**
    * Computes the pane output corresponding to a {@link TriggerKey} that fired.
    */
-  private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, 
WindowState<WV> state) {
-    WindowKey<WK> windowKey = triggerKey.getKey();
-    WV windowVal = state.getWindowValue();
-
-    // For session windows, we will create a new window key by using the time 
of the first message in the window as
-    //the paneId.
-    if (window.getWindowType() == WindowType.SESSION) {
-      windowKey = new WindowKey<>(windowKey.getKey(), 
Long.toString(state.getEarliestTimestamp()));
-    }
-
-    // Make a defensive copy so that we are immune to further mutations on the 
collection
-    if (windowVal instanceof Collection) {
-      windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal);
-    }
-
-    WindowPane<WK, WV> paneOutput =
+  private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, 
Object windowVal) {
+    WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), 
Long.toString(triggerKey.getTimestamp()));
+    WindowPane<K, Object> paneOutput =
         new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), 
triggerKey.getType());
     LOG.trace("Emitting pane output for trigger key {}", triggerKey);
     return paneOutput;
@@ -300,7 +283,7 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   /**
    * Cancels the firing of the {@link TriggerImpl} identified by this {@link 
TriggerKey} and optionally removes it.
    */
-  private void cancelTrigger(TriggerKey<WK> triggerKey, boolean shouldRemove) {
+  private void cancelTrigger(TriggerKey<K> triggerKey, boolean shouldRemove) {
     TriggerImplHandler triggerImplHandler = triggers.get(triggerKey);
     if (triggerImplHandler != null) {
       triggerImplHandler.cancel();
@@ -311,19 +294,90 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
   }
 
   /**
+   * Computes the timestamp of the window this message should belong to.
+   *
+   * In the case of tumbling windows, timestamp of a window is defined as the 
start timestamp of its corresponding window
+   * interval. For instance, if the tumbling interval is 10 seconds, all 
messages that arrive between [1000, 1010]
+   * are assigned to the window with timestamp "1000"
+   *
+   * In the case of session windows, timestamp is defined as the timestamp of 
the earliest message in the window.
+   * For instance, if the session gap is 10 seconds, and the first message in 
the window arrives at "1002" seconds,
+   * all messages (that arrive within 10 seconds of their previous message) 
are assigned a timestamp "1002".
+   *
+   * @param message the input message
+   * @return the timestamp of the window this message should belong to
+   */
+  private long getWindowTimestamp(M message) {
+    if (window.getWindowType() == WindowType.TUMBLING) {
+      long triggerDurationMs = ((TimeTrigger<M>) 
window.getDefaultTrigger()).getDuration().toMillis();
+      final long now = clock.currentTimeMillis();
+      // assign timestamp to be the start timestamp of the window boundary
+      long timestamp = now - now % triggerDurationMs;
+      return timestamp;
+    } else {
+      K key = keyFn.apply(message);
+      // get the value with the earliest timestamp for the provided key.
+      ClosableIterator<TimestampedValue<Object>> iterator = 
timeSeriesStore.get(key, 0, Long.MAX_VALUE, 1);
+      List<TimestampedValue<Object>> timestampedValues = toList(iterator);
+
+      // If there are no existing sessions for the key, we return the current 
timestamp. If not, return the
+      // timestamp of the earliest message.
+      long timestamp = (timestampedValues.isEmpty())? 
clock.currentTimeMillis() : timestampedValues.get(0).getTimestamp();
+
+      return timestamp;
+    }
+  }
+
+  /**
+   * Return a list of values in the store for the provided key and timestamp
+   *
+   * @param key the key to look up in the store
+   * @param timestamp the timestamp to look up in the store
+   * @return the list of values for the provided key
+   */
+  private List<Object> getValues(K key, long timestamp) {
+    ClosableIterator<TimestampedValue<Object>> iterator = 
timeSeriesStore.get(key, timestamp);
+    List<TimestampedValue<Object>> timestampedValues = toList(iterator);
+    List<Object> values = timestampedValues.stream().map(element -> 
element.getValue()).collect(Collectors.toList());
+    return values;
+  }
+
+  /**
+   * Returns an unmodifiable list of all elements in the provided iterator.
+   * The iterator is guaranteed to be closed after its execution.
+   *
+   * @param iterator the provided iterator.
+   * @param <V> the type of elements in the iterator
+   * @return a list of all elements returned by the iterator
+   */
+  static <V>  List<V> toList(ClosableIterator<V> iterator) {
+    List<V> values = new ArrayList<>();
+    try {
+      while (iterator.hasNext()) {
+        values.add(iterator.next());
+      }
+    } finally {
+      if (iterator != null) {
+        iterator.close();
+      }
+    }
+    return Collections.unmodifiableList(values);
+  }
+
+  /**
    * State corresponding to a created {@link TriggerImpl} instance.
    */
   private class TriggerImplHandler {
     // The context, and the {@link TriggerImpl} instance corresponding to this 
triggerKey
-    private final TriggerImpl<M, WK> impl;
+    private final TriggerImpl<M, K> impl;
     // Guard to ensure that we don't invoke onMessage or onTimer on already 
cancelled triggers
     private boolean isCancelled = false;
 
-    public TriggerImplHandler(TriggerKey<WK> key, TriggerImpl<M, WK> impl) {
+    public TriggerImplHandler(TriggerKey<K> key, TriggerImpl<M, K> impl) {
       this.impl = impl;
     }
 
-    public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M 
message,
+    public Optional<WindowPane<K, Object>> onMessage(TriggerKey<K> triggerKey, 
M message,
         MessageCollector collector, TaskCoordinator coordinator) {
       if (!isCancelled) {
         LOG.trace("Forwarding callbacks for {}", message);
@@ -332,7 +386,7 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
         if (impl.shouldFire()) {
           // repeating trigger can trigger multiple times, So, clear the state 
to allow future triggerings.
           if (impl instanceof RepeatingTriggerImpl) {
-            ((RepeatingTriggerImpl<M, WK>) impl).clear();
+            ((RepeatingTriggerImpl<M, K>) impl).clear();
           }
           return onTriggerFired(triggerKey, collector, coordinator);
         }
@@ -340,14 +394,14 @@ public class WindowOperatorImpl<M, WK, WV> extends 
OperatorImpl<M, WindowPane<WK
       return Optional.empty();
     }
 
-    public Optional<WindowPane<WK, WV>> onTimer(
-        TriggerKey<WK> key, MessageCollector collector, TaskCoordinator 
coordinator) {
+    public Optional<WindowPane<K, Object>> onTimer(TriggerKey<K> key, 
MessageCollector collector,
+        TaskCoordinator coordinator) {
       if (impl.shouldFire() && !isCancelled) {
         LOG.trace("Triggering timer triggers");
 
         // repeating trigger can trigger multiple times, So, clear the trigger 
to allow future triggerings.
         if (impl instanceof RepeatingTriggerImpl) {
-          ((RepeatingTriggerImpl<M, WK>) impl).clear();
+          ((RepeatingTriggerImpl<M, K>) impl).clear();
         }
         return onTriggerFired(key, collector, coordinator);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
deleted file mode 100644
index 4577a5c..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-/**
- * Wraps the value stored for a particular {@link 
org.apache.samza.operators.windows.WindowKey} with additional metadata.
- */
-class WindowState<WV> {
-
-  private final WV wv;
-  /**
-   * Time of the first message in the window
-   */
-  private final long earliestRecvTime;
-
-  WindowState(WV wv, long earliestRecvTime) {
-    this.wv = wv;
-    this.earliestRecvTime = earliestRecvTime;
-  }
-
-  WV getWindowValue() {
-    return wv;
-  }
-
-  long getEarliestTimestamp() {
-    return earliestRecvTime;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, 
wv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
index fad8ca4..3aa2e62 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
@@ -24,9 +24,10 @@ package org.apache.samza.operators.impl.store;
  */
 public class TimeSeriesKey<K> {
 
+  public static final int VERSION = 0;
+
   private final K key;
   private final long timestamp;
-
   private final long seqNum;
 
   public TimeSeriesKey(K k, long time, long seq) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
index 273c40a..2e173ab 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
@@ -57,7 +57,10 @@ public class TimeSeriesKeySerde<K> implements 
Serde<TimeSeriesKey<K>> {
     long timestamp = timeSeriesKey.getTimestamp();
     long seqNum = timeSeriesKey.getSeqNum();
 
-    byte[] serializedKey = keySerde.toBytes(key);
+    byte[] serializedKey = null;
+    if (keySerde != null) {
+      serializedKey = keySerde.toBytes(key);
+    }
     int keySize = serializedKey == null ? 0 : serializedKey.length;
 
     // append the timestamp and sequence number to the serialized key bytes
@@ -88,8 +91,9 @@ public class TimeSeriesKeySerde<K> implements 
Serde<TimeSeriesKey<K>> {
     long seqNum = buf.getLong();
     long version = seqNum & ~SEQUENCE_NUM_MASK;
 
-    if (version != 0) {
-      throw new SamzaException("Version is not zero. Sequence number: " + 
seqNum);
+    if (version != TimeSeriesKey.VERSION) {
+      throw new SamzaException(String.format("Invalid version detected in 
TimeSeriesKey. " +
+          "Expected Version: %s Actual Version: %s Sequence number: %s", 
TimeSeriesKey.VERSION, version, seqNum));
     }
     return new TimeSeriesKey(key, timeStamp, seqNum);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
index 56d839e..f3d6948 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -58,6 +58,34 @@ public interface TimeSeriesStore<K, V> {
   ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long 
endTimestamp);
 
   /**
+   * Returns upto {@code maxMessages} for the given key in the provided 
time-range - [{@code startTimestamp}, {@code endTimestamp})
+   *
+   * The values in the returned list are ordered by their timestamp. Values 
with the same timestamp are returned in the order of insertion.
+   * If there are no values in the store for the key in the provided 
time-range, an empty list is returned.
+   *
+   * @param key the key to look up in the store
+   * @param startTimestamp the start timestamp of the range, inclusive
+   * @param endTimestamp the end timestamp of the range, exclusive
+   * @param maxMessages the maximum number of messages to return
+   * @return a list of values with upto {@code maxMessages} elements
+   */
+  ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long 
endTimestamp, int maxMessages);
+
+  /**
+   * Returns an iterator over values for the given key and timestamp
+   *
+   * Values returned by the iterator are in their insertion order.
+   *
+   * <p> The iterator <b>must</b> be closed after use by calling {@link 
#close}. Not doing so will result in memory leaks.
+   *
+   * @param key the key to look up in the store
+   * @param timestamp the timestamp to look up in the store
+   * @return an iterator over the values for the given key and timestamp that 
must be closed after use
+   * @throws IllegalArgumentException when the provided timestamp is negative
+   */
+  ClosableIterator<TimestampedValue<V>> get(K key, long timestamp);
+
+  /**
    * Removes all values for this key in the given time-range.
    *
    * @param key the key to look up in the store
@@ -68,6 +96,14 @@ public interface TimeSeriesStore<K, V> {
   void remove(K key, long startTimestamp, long endTimeStamp);
 
   /**
+   * Removes all values for the given key and timestamp
+   *
+   * @param key the key to look up in the store
+   * @param timestamp the timestamp to look up in the store
+   */
+  void remove(K key, long timestamp);
+
+  /**
    * Flushes this time series store, if applicable.
    */
   void flush();

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index 5e35219..ff7eee9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -26,6 +26,8 @@ import org.apache.samza.storage.kv.KeyValueStore;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -124,6 +126,17 @@ public class TimeSeriesStoreImpl<K, V> implements 
TimeSeriesStore<K, V> {
   }
 
   @Override
+  public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, 
long endTimestamp, int maxValues) {
+    ClosableIterator<TimestampedValue<V>> iterator = get(key, startTimestamp, 
endTimestamp);
+    return new BoundedClosableIterator<>(iterator, maxValues);
+  }
+
+  @Override
+  public ClosableIterator<TimestampedValue<V>> get(K key, long timestamp) {
+    return get(key, timestamp, timestamp + 1);
+  }
+
+  @Override
   public void remove(K key, long startTimestamp, long endTimeStamp) {
     validateRange(startTimestamp, endTimeStamp);
     TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0);
@@ -140,6 +153,11 @@ public class TimeSeriesStoreImpl<K, V> implements 
TimeSeriesStore<K, V> {
   }
 
   @Override
+  public void remove(K key, long timestamp) {
+    remove(key, timestamp, timestamp + 1);
+  }
+
+  @Override
   public void flush() {
     kvStore.flush();
   }
@@ -192,4 +210,46 @@ public class TimeSeriesStoreImpl<K, V> implements 
TimeSeriesStore<K, V> {
       wrappedIterator.remove();
     }
   }
+
+  /**
+   * Wraps a {@link ClosableIterator} to only return the specified number of 
values
+   *
+   * @param <T> the type of values in the iterator
+   */
+  private static class BoundedClosableIterator<T> implements 
ClosableIterator<T> {
+
+    private final AtomicInteger currentCount = new AtomicInteger(0);
+    private final ClosableIterator<T> wrappedIterator;
+    private final int maxCount;
+
+    public BoundedClosableIterator(ClosableIterator<T> wrappedIterator, int 
maxCount) {
+      this.wrappedIterator = wrappedIterator;
+      this.maxCount = maxCount;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return wrappedIterator.hasNext() && currentCount.get() < maxCount;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      currentCount.incrementAndGet();
+      return wrappedIterator.next();
+    }
+
+    @Override
+    public void remove() {
+      wrappedIterator.remove();
+    }
+
+    @Override
+    public void close() {
+      wrappedIterator.close();
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 75f1427..3c8879f 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.AnyTrigger;
 import org.apache.samza.operators.triggers.RepeatingTrigger;
 import org.apache.samza.operators.triggers.TimeBasedTrigger;
@@ -28,10 +29,13 @@ import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.Serde;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -43,7 +47,7 @@ import java.util.stream.Collectors;
  * @param <WK>  the type of key of the window
  * @param <WV>  the type of aggregated value in the window output {@link 
WindowPane}
  */
-public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, 
WindowPane<WK, WV>> {
+public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, 
WindowPane<WK, WV>> implements StatefulOperatorSpec {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperatorSpec.class);
   private final WindowInternal<M, WK, WV> window;
@@ -117,4 +121,17 @@ public class WindowOperatorSpec<M, WK, WV> extends 
OperatorSpec<M, WindowPane<WK
     FoldLeftFunction fn = window.getFoldLeftFunction();
     return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null;
   }
+
+  @Override
+  public Collection<StoreDescriptor> getStoreDescriptors() {
+    String storeName = getOpName();
+    String storeFactory = 
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
+
+    Serde storeKeySerde = new TimeSeriesKeySerde<>(window.getKeySerde());
+    Serde storeValSerde = window.getFoldLeftFunction() == null ? 
window.getMsgSerde() : window.getWindowValSerde();
+
+    StoreDescriptor descriptor = new StoreDescriptor(storeName, storeFactory, 
storeKeySerde, storeValSerde, storeName,
+        Collections.emptyMap());
+    return Collections.singletonList(descriptor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java 
b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 91657ed..c40de7b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ 
b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -54,7 +54,7 @@ public class PageViewCounterExample implements 
StreamApplication {
     Supplier<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
     pageViewEvents
-        .window(Windows.keyedTumblingWindow(m -> m.memberId, 
Duration.ofSeconds(10), initialValue, foldLeftFn)
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, 
Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
             .setAccumulationMode(AccumulationMode.DISCARDING))
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
PageViewCount(windowPane)))

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java 
b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index e9bb284..c403406 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -50,7 +50,7 @@ public class RepartitionExample implements StreamApplication {
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)))
-        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), 
() -> 0, (m, c) -> c + 1))
+        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), 
() -> 0, (m, c) -> c + 1, null, null))
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
MyStreamOutput(windowPane)))
         .sendTo(pageViewEventPerMember);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java 
b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 08c896c..9381e49 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -54,7 +54,7 @@ public class WindowExample implements StreamApplication {
     // also emit early results if either the number of messages collected 
reaches 30000, or if no new messages arrive
     // for 1 minute.
     inputStream
-        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, 
counter)
+        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, 
counter, null)
             .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
         .map(WindowPane::getMessage)
         .sendTo(outputStream);

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index f6441dc..d98cc72 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -178,11 +178,11 @@ public class TestExecutionPlanner {
 
     messageStream1.map(m -> m)
         .filter(m->true)
-        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
mock(Serde.class), mock(Serde.class)));
 
     messageStream2.map(m -> m)
         .filter(m->true)
-        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
mock(Serde.class), mock(Serde.class)));
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index f23bb14..1426444 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -265,7 +265,7 @@ public class TestMessageStreamImpl {
 
     // should compile since TestMessageEnvelope (input for functions) is base 
class of TestInputMessageEnvelope (M)
     Window<TestInputMessageEnvelope, String, Integer> window = Windows
-        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, 
aggregator);
+        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, 
aggregator, null, mock(Serde.class));
     MessageStream<WindowPane<String, Integer>> windowedStream = 
inputStream.window(window);
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index e011121..aee457e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -28,6 +28,8 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.impl.store.TestInMemoryStore;
+import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
@@ -37,6 +39,7 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -70,9 +73,13 @@ public class TestWindowOperator {
     config = mock(Config.class);
     taskContext = mock(TaskContextImpl.class);
     runner = mock(ApplicationRunner.class);
+    Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
+    Serde storeValSerde = new IntegerEnvelopeSerde();
+
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    when(taskContext.getStore("window-3")).thenReturn(new 
TestInMemoryStore<>(storeKeySerde, storeValSerde));
     when(runner.getStreamSpec("integers")).thenReturn(new 
StreamSpec("integers", "integers", "kafka"));
   }
 
@@ -397,7 +404,7 @@ public class TestWindowOperator {
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
         .map(m -> m)
-        .window(Windows.keyedTumblingWindow(keyFn, 
duration).setEarlyTrigger(earlyTrigger)
+        .window(Windows.keyedTumblingWindow(keyFn, duration, new 
IntegerSerde(), new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger)
           .setAccumulationMode(mode))
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
@@ -427,7 +434,7 @@ public class TestWindowOperator {
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
-          
.window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger)
+          .window(Windows.tumblingWindow(duration, new 
IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger)
               .setAccumulationMode(mode))
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
@@ -455,7 +462,7 @@ public class TestWindowOperator {
 
       inStream
           .map(m -> m)
-          .window(Windows.keyedSessionWindow(keyFn, duration)
+          .window(Windows.keyedSessionWindow(keyFn, duration, new 
IntegerSerde(), new IntegerEnvelopeSerde())
               .setAccumulationMode(mode))
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
@@ -469,4 +476,18 @@ public class TestWindowOperator {
       super(new SystemStreamPartition("kafka", "integers", new Partition(0)), 
"1", key, key);
     }
   }
+
+  private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
+    private final IntegerSerde intSerde = new IntegerSerde();
+
+    @Override
+    public byte[] toBytes(IntegerEnvelope object) {
+      return intSerde.toBytes((Integer) object.getKey());
+    }
+
+    @Override
+    public IntegerEnvelope fromBytes(byte[] bytes) {
+      return new IntegerEnvelope(intSerde.fromBytes(bytes));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index 12a32b1..f1fb8e2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.internal.WindowInternal;
@@ -28,6 +29,8 @@ import org.junit.Test;
 
 import java.time.Duration;
 
+import static org.mockito.Mockito.mock;
+
 public class TestWindowOperatorSpec {
   @Test
   public void testTriggerIntervalWithNestedTimeTriggers() {
@@ -41,7 +44,8 @@ public class TestWindowOperatorSpec {
                 Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
                 Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));
 
-    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null, null, WindowType.SESSION);
+    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null,
+            null, WindowType.SESSION, null, null, mock(Serde.class));
     window.setEarlyTrigger(earlyTrigger);
     window.setLateTrigger(lateTrigger);
 
@@ -54,7 +58,8 @@ public class TestWindowOperatorSpec {
     Trigger defaultTrigger = 
Triggers.timeSinceFirstMessage(Duration.ofMillis(150));
     Trigger earlyTrigger = Triggers.repeat(Triggers.count(5));
 
-    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null, null, WindowType.SESSION);
+    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null,
+            null, WindowType.SESSION, null, null, mock(Serde.class));
     window.setEarlyTrigger(earlyTrigger);
 
     WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index a681767..697833b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -19,15 +19,16 @@
 
 package org.apache.samza.zk;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Rule;
-import org.junit.rules.Timeout;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class TestScheduleAfterDebounceTime {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 517d81f..e35dfb7 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -68,7 +68,7 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
         .partitionBy(UserPageAdClick::getUserId, upac -> upac,
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)))
         .map(KV::getValue)
-        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, 
Duration.ofSeconds(3)))
+        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, 
Duration.ofSeconds(3), new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)))
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), 
String.valueOf(windowPane.getMessage().size())))
         .sendTo(outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 974cafc..6410e7d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -50,7 +50,8 @@ public class SessionWindowApp implements StreamApplication {
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedSessionWindow(PageView::getUserId, 
Duration.ofSeconds(3)))
+        .window(Windows.keyedSessionWindow(PageView::getUserId, 
Duration.ofSeconds(3), new StringSerde(),
+            new JsonSerdeV2<>(PageView.class)))
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 151c9d1..5d04f21 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -51,7 +51,7 @@ public class TumblingWindowApp implements StreamApplication {
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedTumblingWindow(PageView::getUserId, 
Duration.ofSeconds(3)))
+        .window(Windows.keyedTumblingWindow(PageView::getUserId, 
Duration.ofSeconds(3), null, null))
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }

Reply via email to