[FLINK-2683] [FLINK-2682] [runtime] Add dedicated operator for aligned 
processing time windows.

Also add utilities for heap-backed keyed state in panes (dedicated tailored 
hash table)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f58fab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f58fab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f58fab

Branch: refs/heads/master
Commit: b8f58fab56c727b8d44d2a90bbb389ab7544989e
Parents: 7ad2103
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 15 15:00:17 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 22 13:25:20 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/Function.java    |   2 +-
 .../flink/runtime/util/UnionIterator.java       | 100 +++
 .../flink/runtime/util/UnionIteratorTest.java   | 142 ++++
 .../functions/windows/KeyedWindowFunction.java  |  44 ++
 .../api/operators/TimestampedCollector.java     |   6 +-
 .../operators/StreamingOperatorMetrics.java     |  27 +
 ...ractAlignedProcessingTimeWindowOperator.java | 270 ++++++++
 .../windows/AbstractKeyedTimePanes.java         |  76 +++
 .../windows/AccumulatingKeyedTimePanes.java     | 126 ++++
 ...ccumulatingProcessingTimeWindowOperator.java |  48 ++
 .../windows/AggregatingKeyedTimePanes.java      | 103 +++
 ...AggregatingProcessingTimeWindowOperator.java |  47 ++
 .../runtime/operators/windows/KeyMap.java       | 651 +++++++++++++++++++
 .../runtime/operators/windows/package-info.java |  22 +
 .../runtime/tasks/StreamTaskException.java      |   2 +-
 .../streaming/runtime/tasks/package-info.java   |  27 +
 ...AlignedProcessingTimeWindowOperatorTest.java | 515 +++++++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java | 528 +++++++++++++++
 .../operators/windows/CollectingOutput.java     |  80 +++
 .../windows/KeyMapPutIfAbsentTest.java          | 121 ++++
 .../operators/windows/KeyMapPutTest.java        | 136 ++++
 .../runtime/operators/windows/KeyMapTest.java   | 344 ++++++++++
 .../GroupedProcessingTimeWindowExample.java     | 167 +++++
 23 files changed, 3581 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index 632a0d2..8a1819c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -24,5 +24,5 @@ package org.apache.flink.api.common.functions;
  * <p>This interface is empty in order to allow extending interfaces to
  * be SAM (single abstract method) interfaces that can be implemented via Java 
8 lambdas.</p>
  */
-public interface Function {
+public interface Function extends java.io.Serializable {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
new file mode 100644
index 0000000..c279adf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TraversableOnceException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class UnionIterator<T> implements Iterator<T>, Iterable<T> {
+       
+       private Iterator<T> currentIterator;
+       
+       private ArrayList<List<T>> furtherLists = new ArrayList<>();
+       
+       private int nextList;
+       
+       private boolean iteratorAvailable = true;
+
+       // 
------------------------------------------------------------------------
+       
+       public void clear() {
+               currentIterator = null;
+               furtherLists.clear();
+               nextList = 0;
+               iteratorAvailable = true;
+       }
+       
+       public void addList(List<T> list) {
+               if (currentIterator == null) {
+                       currentIterator = list.iterator();
+               }
+               else {
+                       furtherLists.add(list);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public Iterator<T> iterator() {
+               if (iteratorAvailable) {
+                       iteratorAvailable = false;
+                       return this;
+               } else {
+                       throw new TraversableOnceException();
+               }
+       }
+
+       @Override
+       public boolean hasNext() {
+               while (currentIterator != null) {
+                       if (currentIterator.hasNext()) {
+                               return true;
+                       }
+                       else if (nextList < furtherLists.size()) {
+                               currentIterator = 
furtherLists.get(nextList).iterator();
+                               nextList++;
+                       }
+                       else {
+                               currentIterator = null;
+                       }
+               }
+               
+               return false;
+       }
+
+       @Override
+       public T next() {
+               if (hasNext()) {
+                       return currentIterator.next();
+               }
+               else {
+                       throw new NoSuchElementException();
+               }
+       }
+
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java
new file mode 100644
index 0000000..2a79b1f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TraversableOnceException;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.*;
+
+public class UnionIteratorTest {
+
+       @Test
+       public void testUnion() {
+               try {
+                       UnionIterator<Integer> iter = new UnionIterator<>();
+
+                       // should succeed and be empty
+                       assertFalse(iter.iterator().hasNext());
+
+                       iter.clear();
+                       
+                       try {
+                               iter.iterator().next();
+                               fail("should fail with an exception");
+                       } catch (NoSuchElementException e) {
+                               // expected
+                       }
+
+                       iter.clear();
+                       iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+                       iter.addList(Collections.<Integer>emptyList());
+                       iter.addList(Arrays.asList(8, 9, 10, 11));
+                       
+                       int val = 1;
+                       for (int i : iter) {
+                               assertEquals(val++, i);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testTraversableOnce() {
+               try {
+                       UnionIterator<Integer> iter = new UnionIterator<>();
+                       
+                       // should succeed
+                       iter.iterator();
+                       
+                       // should fail
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // should fail again
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, keep it empty
+                       iter.clear();
+
+                       // should succeed
+                       iter.iterator();
+
+                       // should fail
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // should fail again
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, add some data
+                       iter.clear();
+                       iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+                       
+                       // should succeed
+                       Iterator<Integer> ints = iter.iterator();
+                       assertNotNull(ints.next());
+                       assertNotNull(ints.next());
+                       assertNotNull(ints.next());
+                       
+                       // should fail if called in the middle of operations
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, keep it empty
+                       iter.clear();
+
+                       // should succeed again
+                       assertFalse(iter.iterator().hasNext());
+                       
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
new file mode 100644
index 0000000..d7ca0a1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions that are evaluated over keyed (grouped) 
windows.
+ * 
+ * @param <KEY> The type of the key.
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, 
Serializable {
+
+       /**
+        * 
+        * @param key
+        * @param values
+        * @param out
+        * 
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery. 
+        */
+       void evaluate(KEY key, Iterable<IN> values, Collector<OUT> out) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
index 0ff223c..62514fc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -30,9 +30,13 @@ import org.joda.time.Instant;
  * @param <T> The type of the elments that can be emitted.
  */
 public class TimestampedCollector<T> implements Collector<T> {
+       
        private final Output<StreamRecord<T>> output;
+
+       private final StreamRecord<T> reuse;
+       
        private long timestamp;
-       private StreamRecord<T> reuse;
+       
 
        /**
         * Creates a new {@link TimestampedCollector} that wraps the given 
{@link Output}.

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
new file mode 100644
index 0000000..8429889
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+public class StreamingOperatorMetrics {
+       
+       
+       public void incrementLateElementDiscarded() {
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..f5f576d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.TriggerTimer;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT> extends AbstractStreamOperator<OUT> 
+               implements OneInputStreamOperator<IN, OUT>, Triggerable {
+       
+       private static final long serialVersionUID = 3245500864882459867L;
+       
+       private static final long MIN_SLIDE_TIME = 50;
+       
+       
+       private final SerializableObject lock = new SerializableObject();
+
+       // ----- fields for operator parametrization -----
+       
+       private final Function function;
+       private final KeySelector<IN, KEY> keySelector;
+       
+       private final long windowSize;
+       private final long windowSlide;
+       private final long paneSize;
+       private final int numPanesPerWindow;
+       
+       // ----- fields for operator functionality -----
+       
+       private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+       
+       private transient TimestampedCollector<OUT> out;
+       
+       private transient TriggerTimer triggerTimer;
+       
+       private transient long nextEvaluationTime;
+       private transient long nextSlideTime;
+       
+       private transient volatile Throwable asyncError;
+       
+       
+       protected AbstractAlignedProcessingTimeWindowOperator(
+                       Function function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               if (function == null || keySelector == null) {
+                       throw new NullPointerException();
+               }
+               if (windowLength < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException("Window length must 
be at least " + MIN_SLIDE_TIME + " msecs");
+               }
+               if (windowSlide < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException("Window slide must 
be at least " + MIN_SLIDE_TIME + " msecs");
+               }
+               if (windowLength < windowSlide) {
+                       throw new IllegalArgumentException("The window size 
must be larger than the window slide");
+               }
+               
+               final long paneSlide = ArithmeticUtils.gcd(windowLength, 
windowSlide);
+               if (paneSlide < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException(String.format(
+                                       "Cannot compute window of size %d msecs 
sliding by %d msecs. " +
+                                                       "The unit of grouping 
is too small: %d msecs", windowLength, windowSlide, paneSlide));
+               }
+               
+               this.function = function;
+               this.keySelector = keySelector;
+               this.windowSize = windowLength;
+               this.windowSlide = windowSlide;
+               this.paneSize = paneSlide;
+               this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength 
/ paneSlide);
+       }
+       
+       
+       protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+                       KeySelector<IN, KEY> keySelector, Function function);
+
+       // 
------------------------------------------------------------------------
+       //  startup and shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               out = new TimestampedCollector<>(output);
+               
+               // create the panes that gather the elements per slide
+               panes = createPanes(keySelector, function);
+               
+               // decide when to first compute the window and when to slide it
+               // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
+               final long now = System.currentTimeMillis();
+               nextEvaluationTime = now + windowSlide - (now % windowSlide);
+               nextSlideTime = now + paneSize - (now % paneSize);
+               
+               // start the trigger timer
+               triggerTimer = new TriggerTimer("Trigger for " + 
getRuntimeContext().getTaskName());
+               
+               // schedule the first trigger
+               triggerTimer.scheduleTriggerAt(this, 
Math.min(nextEvaluationTime, nextSlideTime));
+       }
+
+       @Override
+       public void close() throws Exception {
+               // acquire the lock during shutdown, to prevent trigger calls 
at the same time
+               synchronized (lock) {
+                       final long finalWindowTimestamp = nextEvaluationTime;
+                       
+                       // early stop the triggering thread, so it does not 
attempt to return any more data
+                       stopTriggers();
+
+                       // make sure we had no asynchronous error so far
+                       checkErroneous();
+                       
+                       // emit the remaining data
+                       computeWindow(finalWindowTimestamp);
+               }
+       }
+
+       @Override
+       public void dispose() {
+               // acquire the lock during shutdown, to prevent trigger calls 
at the same time
+               synchronized (lock) {
+                       // fail-safe stop of the triggering thread (in case of 
an error)
+                       stopTriggers();
+                       
+                       // release the panes
+                       panes.dispose();
+               }
+       }
+       
+       private void stopTriggers() {
+               if (triggerTimer != null) {
+                       triggerTimer.shutdown();
+               }
+
+               // reset the action timestamps. this makes sure any pending 
triggers will not evaluate
+               nextEvaluationTime = -1L;
+               nextSlideTime = -1L;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Receiving elements and triggers
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               synchronized (lock) {
+                       checkErroneous();
+                       panes.addElementToLatestPane(element.getValue());
+               }
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) {
+               // this operator does not react to watermarks
+       }
+
+       @Override
+       public void trigger(long timestamp) {
+               synchronized (lock) {
+                       // first we check if we actually trigger the window 
function
+                       if (timestamp == nextEvaluationTime) {
+                               // compute and output the results
+                               try {
+                                       computeWindow(timestamp);
+                               }
+                               catch (Throwable t) {
+                                       this.asyncError = t;
+                               }
+
+                               nextEvaluationTime += windowSlide;
+                       }
+                       
+                       // check if we slide the panes by one. this may happen 
in addition to the
+                       // window computation, or just by itself
+                       if (timestamp == nextSlideTime) {
+                               try {
+                                       panes.slidePanes(numPanesPerWindow);
+                               }
+                               catch (Throwable t) {
+                                       this.asyncError = t;
+                               }
+                               nextSlideTime += paneSize;
+                       }
+                       
+                       long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
+                       triggerTimer.scheduleTriggerAt(this, nextTriggerTime);
+               }
+       }
+       
+       private void checkErroneous() throws Exception {
+               if (asyncError != null) {
+                       throw new Exception("Error while computing and 
producing window result", asyncError);
+               }
+       }
+       
+       private void computeWindow(long timestamp) throws Exception {
+               out.setTimestamp(timestamp);
+               panes.truncatePanes(numPanesPerWindow);
+               panes.evaluateWindow(out);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Property access (for testing)
+       // 
------------------------------------------------------------------------
+
+       public long getWindowSize() {
+               return windowSize;
+       }
+
+       public long getWindowSlide() {
+               return windowSlide;
+       }
+
+       public long getPaneSize() {
+               return paneSize;
+       }
+       
+       public int getNumPanesPerWindow() {
+               return numPanesPerWindow;
+       }
+
+       public long getNextEvaluationTime() {
+               return nextEvaluationTime;
+       }
+
+       public long getNextSlideTime() {
+               return nextSlideTime;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "Window (processing time) (length=" + windowSize + ", 
slide=" + windowSlide + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
new file mode 100644
index 0000000..a49b2e6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayDeque;
+
+
+public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
+       
+       protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
+
+       protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new 
ArrayDeque<>();
+
+       // 
------------------------------------------------------------------------
+
+       public abstract void addElementToLatestPane(Type element) throws 
Exception;
+
+       public abstract void evaluateWindow(Collector<Result> out) throws 
Exception;
+       
+       
+       public void dispose() {
+               // since all is heap data, there is no need to clean up anything
+               latestPane = null;
+               previousPanes.clear();
+       }
+       
+       
+       public void slidePanes(int panesToKeep) {
+               if (panesToKeep > 1) {
+                       // the current pane becomes the latest previous pane
+                       previousPanes.addLast(latestPane);
+
+                       // truncate the history
+                       while (previousPanes.size() >= panesToKeep) {
+                               previousPanes.removeFirst();
+                       }
+               }
+
+               // we need a new latest pane
+               latestPane = new KeyMap<>();
+       }
+       
+       public void truncatePanes(int numToRetain) {
+               while (previousPanes.size() >= numToRetain) {
+                       previousPanes.removeFirst();
+               }
+       }
+       
+       protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, 
Aggregate> traversal, long traversalPass) throws Exception{
+               // gather all panes in an array (faster iterations)
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new 
KeyMap[previousPanes.size() + 1]);
+               panes[panes.length - 1] = latestPane;
+
+               // let the maps make a coordinated traversal and evaluate the 
window function per contained key
+               KeyMap.traverseMaps(panes, traversal, traversalPass);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
new file mode 100644
index 0000000..e776106
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.util.UnionIterator;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+
+public class AccumulatingKeyedTimePanes<Type, Key, Result> extends 
AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
+       
+       private final KeySelector<Type, Key> keySelector;
+
+       private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
+
+       private final KeyedWindowFunction<Key, Type, Result> function;
+       
+       private long evaluationPass;
+
+       // 
------------------------------------------------------------------------
+       
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Key, Type, Result> function) {
+               this.keySelector = keySelector;
+               this.function = function;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void addElementToLatestPane(Type element) throws Exception {
+               Key k = keySelector.getKey(element);
+               ArrayList<Type> elements = latestPane.putIfAbsent(k, 
listFactory);
+               elements.add(element);
+       }
+
+       @Override
+       public void evaluateWindow(Collector<Result> out) throws Exception {
+               if (previousPanes.isEmpty()) {
+                       // optimized path for single pane case (tumbling window)
+                       for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
+                               function.evaluate(entry.getKey(), 
entry.getValue(), out);
+                       }
+               }
+               else {
+                       // general code path for multi-pane case
+                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(function, out);
+                       traverseAllPanes(evaluator, evaluationPass);
+               }
+               
+               evaluationPass++;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Running a window function in a map traversal
+       // 
------------------------------------------------------------------------
+       
+       static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
+
+               private final KeyedWindowFunction<Key, Type, Result> function;
+               
+               private final UnionIterator<Type> unionIterator;
+               
+               private final Collector<Result> out;
+               
+               private Key currentKey;
+
+               WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> 
function, Collector<Result> out) {
+                       this.function = function;
+                       this.out = out;
+                       this.unionIterator = new UnionIterator<>();
+               }
+
+
+               @Override
+               public void startNewKey(Key key) {
+                       unionIterator.clear();
+                       currentKey = key;
+               }
+
+               @Override
+               public void nextValue(ArrayList<Type> value) {
+                       unionIterator.addList(value);
+               }
+
+               @Override
+               public void keyDone() throws Exception {
+                       function.evaluate(currentKey, unionIterator, out);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Lazy factory for lists (put if absent)
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
+               return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
+       }
+
+       private static final KeyMap.LazyFactory<?> LIST_FACTORY = new 
KeyMap.LazyFactory<ArrayList<?>>() {
+
+               @Override
+               public ArrayList<?> create() {
+                       return new ArrayList<>(4);
+               }
+       };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..16444c2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+
+
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT>  {
+
+       private static final long serialVersionUID = 7305948082830843475L;
+
+       
+       public AccumulatingProcessingTimeWindowOperator(
+                       KeyedWindowFunction<KEY, IN, OUT> function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               super(function, keySelector, windowLength, windowSlide);
+       }
+
+       @Override
+       protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+               @SuppressWarnings("unchecked")
+               KeyedWindowFunction<KEY, IN, OUT> windowFunction = 
(KeyedWindowFunction<KEY, IN, OUT>) function;
+               
+               return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
new file mode 100644
index 0000000..730c984
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+
+public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes<Type, Key, Type, Type> {
+       
+       private final KeySelector<Type, Key> keySelector;
+       
+       private final ReduceFunction<Type> reducer;
+       
+       private long evaluationPass;
+
+       // 
------------------------------------------------------------------------
+       
+       public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
ReduceFunction<Type> reducer) {
+               this.keySelector = keySelector;
+               this.reducer = reducer;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void addElementToLatestPane(Type element) throws Exception {
+               Key k = keySelector.getKey(element);
+               latestPane.putOrAggregate(k, element, reducer);
+       }
+
+       @Override
+       public void evaluateWindow(Collector<Type> out) throws Exception {
+               if (previousPanes.isEmpty()) {
+                       // optimized path for single pane case
+                       for (KeyMap.Entry<Key, Type> entry : latestPane) {
+                               out.collect(entry.getValue());
+                       }
+               }
+               else {
+                       // general code path for multi-pane case
+                       AggregatingTraversal<Key, Type> evaluator = new 
AggregatingTraversal<>(reducer, out);
+                       traverseAllPanes(evaluator, evaluationPass);
+               }
+               
+               evaluationPass++;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  The maps traversal that performs the final aggregation
+       // 
------------------------------------------------------------------------
+       
+       static final class AggregatingTraversal<Key, Type> implements 
KeyMap.TraversalEvaluator<Key, Type> {
+
+               private final ReduceFunction<Type> function;
+               
+               private final Collector<Type> out;
+               
+               private Type currentValue;
+
+               AggregatingTraversal(ReduceFunction<Type> function, 
Collector<Type> out) {
+                       this.function = function;
+                       this.out = out;
+               }
+
+               @Override
+               public void startNewKey(Key key) {
+                       currentValue = null;
+               }
+
+               @Override
+               public void nextValue(Type value) throws Exception {
+                       if (currentValue != null) {
+                               currentValue = function.reduce(currentValue, 
value);
+                       }
+                       else {
+                               currentValue = value;
+                       }
+               }
+
+               @Override
+               public void keyDone() throws Exception {
+                       out.collect(currentValue);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..8bed749
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
IN>  {
+
+       private static final long serialVersionUID = 7305948082830843475L;
+
+       
+       public AggregatingProcessingTimeWindowOperator(
+                       ReduceFunction<IN> function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               super(function, keySelector, windowLength, windowSlide);
+       }
+
+       @Override
+       protected AggregatingKeyedTimePanes<IN, KEY> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+               @SuppressWarnings("unchecked")
+               ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) 
function;
+               
+               return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, 
windowFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java
new file mode 100644
index 0000000..6e2d75e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.runtime.util.MathUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A special Hash Map implementation that can be traversed efficiently in sync 
with other
+ * hash maps.
+ * <p>
+ * The differences between this hash map and Java's "java.util.HashMap" are:
+ * <ul>
+ *     <li>A different hashing scheme. This implementation uses extensible 
hashing, meaning that
+ *         each hash table growth takes one more lower hash code bit into 
account, and values that where
+ *         formerly in the same bucket will afterwards be in the two adjacent 
buckets.</li>
+ *     <li>This allows an efficient traversal of multiple hash maps together, 
even though the maps are
+ *         of different sizes.</li>
+ *     <li>The map offers functions such as "putIfAbsent()" and 
"putOrAggregate()"</li>
+ *     <li>The map supports no removal/shrinking.</li>
+ * </ul>
+ */
+public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
+       
+       /** The minimum table capacity, 64 entries */
+       private static final int MIN_CAPACITY = 0x40;
+       
+       /** The maximum possible table capacity, the largest positive power of
+        * two in the 32bit signed integer value range */
+       private static final int MAX_CAPACITY = 0x40000000;
+       
+       /** The number of bits used for table addressing when table is at max 
capacity */
+       private static final int FULL_BIT_RANGE = 
MathUtils.log2strict(MAX_CAPACITY);
+       
+       // 
------------------------------------------------------------------------
+       
+       /** The hash index, as an array of entries */
+       private Entry<K, V>[] table;
+       
+       /** The number of bits by which the hash code is shifted right, to find 
the bucket */
+       private int shift;
+       
+       /** The number of elements in the hash table */
+       private int numElements;
+       
+       /** The number of elements above which the hash table needs to grow */
+       private int rehashThreshold;
+       
+       /** The base-2 logarithm of the table capacity */ 
+       private int log2size;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new hash table with the default initial capacity.
+        */
+       public KeyMap() {
+               this(0);
+       }
+
+       /**
+        * Creates a new table with a capacity tailored to the given expected 
number of elements.
+        * 
+        * @param expectedNumberOfElements The number of elements to tailor the 
capacity to.
+        */
+       public KeyMap(int expectedNumberOfElements) {
+               if (expectedNumberOfElements < 0) {
+                       throw new IllegalArgumentException("Invalid capacity: " 
+ expectedNumberOfElements);
+               }
+               
+               // round up to the next power or two
+               // guard against too small capacity and integer overflows
+               int capacity = Integer.highestOneBit(expectedNumberOfElements) 
<< 1;
+               capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : 
MAX_CAPACITY;
+
+               // this also acts as a sanity check
+               log2size = MathUtils.log2strict(capacity);
+               shift = FULL_BIT_RANGE - log2size;
+               table = allocateTable(capacity);
+               rehashThreshold = getRehashThreshold(capacity);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Gets and Puts
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Inserts the given value, mapped under the given key. If the table 
already contains a value for
+        * the key, the value is replaced and returned. If no value is 
contained, yet, the function
+        * returns null.
+        * 
+        * @param key The key to insert.
+        * @param value The value to insert.
+        * @return The previously mapped value for the key, or null, if no 
value was mapped for the key.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public final V put(K key, V value) {
+               final int hash = hash(key);
+               final int slot = indexOf (hash);
+               
+               // search the chain from the slot
+               for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
+                       Object k;
+                       if (e.hashCode == hash && ((k = e.key) == key || 
key.equals(k))) {
+                               // found match
+                               V old = e.value;
+                               e.value = value;
+                               return old;
+                       }
+               }
+
+               // no match, insert a new value
+               insertNewEntry(hash, key, value, slot);
+               return null;
+       }
+
+       /**
+        * Inserts a value for the given key, if no value is yet contained for 
that key. Otherwise,
+        * returns the value currently contained for the key.
+        * <p>
+        * The value that is inserted in case that the key is not contained, 
yet, is lazily created
+        * using the given factory.
+        *
+        * @param key The key to insert.
+        * @param factory The factory that produces the value, if no value is 
contained, yet, for the key.
+        * @return The value in the map after this operation (either the 
previously contained value, or the
+        *         newly created value).
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public final V putIfAbsent(K key, LazyFactory<V> factory) {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               // found match
+                               return entry.value;
+                       }
+               }
+
+               // no match, insert a new value
+               V value = factory.create();
+               insertNewEntry(hash, key, value, slot);
+
+               // return the created value
+               return value;
+       }
+
+       /**
+        * Inserts or aggregates a value into the hash map. If the hash map 
does not yet contain the key,
+        * this method inserts the value. If the table already contains the key 
(and a value) this
+        * method will use the given ReduceFunction function to combine the 
existing value and the
+        * given value to a new value, and store that value for the key. 
+        * 
+        * @param key The key to map the value.
+        * @param value The new value to insert, or aggregate with the existing 
value.
+        * @param aggregator The aggregator to use if a value is already 
contained.
+        * 
+        * @return The value in the map after this operation: Either the given 
value, or the aggregated value.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        * @throws Exception The method forwards exceptions from the 
aggregation function.
+        */
+       public final V putOrAggregate(K key, V value, ReduceFunction<V> 
aggregator) throws Exception {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               // found match
+                               entry.value = aggregator.reduce(entry.value, 
value);
+                               return entry.value;
+                       }
+               }
+
+               // no match, insert a new value
+               insertNewEntry(hash, key, value, slot);
+               // return the original value
+               return value;
+       }
+
+       /**
+        * Looks up the value mapped under the given key. Returns null if no 
value is mapped under this key.
+        * 
+        * @param key The key to look up.
+        * @return The value associated with the key, or null, if no value is 
found for the key.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public V get(K key) {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+               
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               return entry.value;
+                       }
+               }
+               
+               // not found
+               return null;
+       }
+
+       private void insertNewEntry(int hashCode, K key, V value, int position) 
{
+               Entry<K,V> e = table[position];
+               table[position] = new Entry<>(key, value, hashCode, e);
+               numElements++;
+
+               // rehash if necessary
+               if (numElements > rehashThreshold) {
+                       growTable();
+               }
+       }
+       
+       private int indexOf(int hashCode) {
+               return (hashCode >> shift) & (table.length - 1);
+       }
+
+       /**
+        * Creates an iterator over the entries of this map.
+        * 
+        * @return An iterator over the entries of this map.
+        */
+       @Override
+       public Iterator<Entry<K, V>> iterator() {
+               return new Iterator<Entry<K, V>>() {
+                       
+                       private final Entry<K, V>[] tab = KeyMap.this.table;
+                       
+                       private Entry<K, V> nextEntry;
+                       
+                       private int nextPos = 0;
+                       
+                       @Override
+                       public boolean hasNext() {
+                               if (nextEntry != null) {
+                                       return true;
+                               }
+                               else {
+                                       while (nextPos < tab.length) {
+                                               Entry<K, V> e = tab[nextPos++];
+                                               if (e != null) {
+                                                       nextEntry = e;
+                                                       return true;
+                                               }
+                                       }
+                                       return false;
+                               }
+                       }
+
+                       @Override
+                       public Entry<K, V> next() {
+                               if (nextEntry != null || hasNext()) {
+                                       Entry<K, V> e = nextEntry;
+                                       nextEntry = nextEntry.next;
+                                       return e;
+                               }
+                               else {
+                                       throw new NoSuchElementException();
+                               }
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+               };
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Gets the number of elements currently in the map.
+        * @return The number of elements currently in the map.
+        */
+       public int size() {
+               return numElements;
+       }
+
+       /**
+        * Checks whether the map is empty.
+        * @return True, if the map is empty, false otherwise.
+        */
+       public boolean isEmpty() {
+               return numElements == 0;
+       }
+
+       /**
+        * Gets the current table capacity, i.e., the number of slots in the 
hash table, without
+        * and overflow chaining.
+        * @return The number of slots in the hash table.
+        */
+       public int getCurrentTableCapacity() {
+               return table.length;
+       }
+
+       /**
+        * Gets the base-2 logarithm of the hash table capacity, as returned by
+        * {@link #getCurrentTableCapacity()}.
+        * 
+        * @return The base-2 logarithm of the hash table capacity.
+        */
+       public int getLog2TableCapacity() {
+               return log2size;
+       }
+       
+       public int getRehashThreshold() {
+               return rehashThreshold;
+       }
+       
+       public int getShift() {
+               return shift;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private Entry<K, V>[] allocateTable(int numElements) {
+               return (Entry<K, V>[]) new Entry<?, ?>[numElements];
+       }
+       
+       private void growTable() {
+               final int newSize = table.length << 1;
+                               
+               // only grow if there is still space to grow the table
+               if (newSize > 0) {
+                       final Entry<K, V>[] oldTable = this.table;
+                       final Entry<K, V>[] newTable = allocateTable(newSize);
+
+                       final int newShift = shift - 1;
+                       final int newMask = newSize - 1;
+                       
+                       // go over all slots from the table. since we hash to 
adjacent positions in
+                       // the new hash table, this is actually cache efficient
+                       for (Entry<K, V> entry : oldTable) {
+                               // traverse the chain for each slot
+                               while (entry != null) {
+                                       final int newPos = (entry.hashCode >> 
newShift) & newMask;
+                                       Entry<K, V> nextEntry = entry.next;
+                                       entry.next = newTable[newPos];
+                                       newTable[newPos] = entry;
+                                       entry = nextEntry;
+                               }
+                       }
+                       
+                       this.table = newTable;
+                       this.shift = newShift;
+                       this.rehashThreshold = getRehashThreshold(newSize);
+                       this.log2size += 1;
+               }
+       }
+       
+       private static int hash(Object key) {
+               int code = key.hashCode();
+               
+               // we need a strong hash function that generates diverse upper 
bits
+               // this hash function is more expensive than the "scramble" 
used by "java.util.HashMap",
+               // but required for this sort of hash table
+               code = (code + 0x7ed55d16) + (code << 12);
+               code = (code ^ 0xc761c23c) ^ (code >>> 19);
+               code = (code + 0x165667b1) + (code << 5);
+               code = (code + 0xd3a2646c) ^ (code << 9);
+               code = (code + 0xfd7046c5) + (code << 3);
+               return (code ^ 0xb55a4f09) ^ (code >>> 16);
+       }
+       
+       private static int getRehashThreshold(int capacity) {
+               // divide before multiply, to avoid overflow
+               return capacity / 4 * 3;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Testing Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * For testing only: Actively counts the number of entries, rather than 
using the
+        * counter variable. This method has linear complexity, rather than 
constant.
+        * 
+        * @return The counted number of entries.
+        */
+       int traverseAndCountElements() {
+               int num = 0;
+               
+               for (Entry<?, ?> entry : table) {
+                       while (entry != null) {
+                               num++;
+                               entry = entry.next;
+                       }
+               }
+               
+               return num;
+       }
+
+       /**
+        * For testing only: Gets the length of the longest overflow chain.
+        * This method has linear complexity.
+        * 
+        * @return The length of the longest overflow chain.
+        */
+       int getLongestChainLength() {
+               int maxLen = 0;
+
+               for (Entry<?, ?> entry : table) {
+                       int thisLen = 0;
+                       while (entry != null) {
+                               thisLen++;
+                               entry = entry.next;
+                       }
+                       maxLen = Math.max(maxLen, thisLen);
+               }
+
+               return maxLen;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * An entry in the hash table.
+        * 
+        * @param <K> Type of the key.
+        * @param <V> Type of the value.
+        */
+       public static final class Entry<K, V> {
+               
+               final K key;
+               final int hashCode;
+               
+               V value;
+               Entry<K, V> next;
+               long touchedTag;
+
+               Entry(K key, V value, int hashCode, Entry<K, V> next) {
+                       this.key = key;
+                       this.value = value;
+                       this.next = next;
+                       this.hashCode = hashCode;
+               }
+
+               public K getKey() {
+                       return key;
+               }
+
+               public V getValue() {
+                       return value;
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Performs a traversal about logical the multi-map that results from 
the union of the
+        * given maps. This method does not actually build a union of the map, 
but traverses the hash maps
+        * together.
+        * 
+        * @param maps The array uf maps whose union should be traversed.
+        * @param visitor The visitor that is called for each key and all 
values.
+        * @param touchedTag A tag that is used to mark elements that have been 
touched in this specific
+        *                   traversal. Each successive traversal should supply 
a larger value for this
+        *                   tag than the previous one.
+        * 
+        * @param <K> The type of the map's key.
+        * @param <V> The type of the map's value.
+        */
+       public static <K, V> void traverseMaps(
+                                       final KeyMap<K, V>[] maps,
+                                       final TraversalEvaluator<K, V> visitor,
+                                       final long touchedTag)
+               throws Exception
+       {
+               // we need to work on the maps in descending size
+               Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
+               
+               final int[] shifts = new int[maps.length];
+               final int[] lowBitsMask = new int[maps.length];
+               final int numSlots = maps[0].table.length;
+               final int numTables = maps.length;
+               
+               // figure out how much each hash table collapses the entries
+               for (int i = 0; i < numTables; i++) {
+                       shifts[i] = maps[0].log2size - maps[i].log2size;
+                       lowBitsMask[i] = (1 << shifts[i]) - 1;
+               }
+               
+               // go over all slots (based on the largest hash table)
+               for (int pos = 0; pos < numSlots; pos++) {
+                       
+                       // for each slot, go over all tables, until the table 
does not have that slot any more
+                       // for tables where multiple slots collapse into one, 
we visit that one when we process the
+                       // latest of all slots that collapse to that one
+                       int mask;
+                       for (int rootTable = 0;
+                                       rootTable < numTables && ((mask = 
lowBitsMask[rootTable]) & pos) == mask;
+                                       rootTable++)
+                       {
+                               // use that table to gather keys and start 
collecting keys from the following tables
+                               // go over all entries of that slot in the table
+                               Entry<K, V> entry = maps[rootTable].table[pos 
>> shifts[rootTable]];
+                               while (entry != null) {
+                                       // take only entries that have not been 
collected as part of other tables
+                                       if (entry.touchedTag < touchedTag) {
+                                               entry.touchedTag = touchedTag;
+                                               
+                                               final K key = entry.key;
+                                               final int hashCode = 
entry.hashCode;
+                                               visitor.startNewKey(key);
+                                               visitor.nextValue(entry.value);
+                                               
+                                               addEntriesFromChain(entry.next, 
visitor, key, touchedTag, hashCode);
+                                               
+                                               // go over the other hash 
tables and collect their entries for the key
+                                               for (int followupTable = 
rootTable + 1; followupTable < numTables; followupTable++) {
+                                                       Entry<K, V> 
followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
+                                                       if (followupEntry != 
null) {
+                                                               
addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
+                                                       }
+                                               }
+
+                                               visitor.keyDone();
+                                       }
+                                       
+                                       entry = entry.next;
+                               }
+                       }
+               }
+       }
+       
+       private static <K, V> void addEntriesFromChain(
+                       Entry<K, V> entry,
+                       TraversalEvaluator<K, V> visitor,
+                       K key,
+                       long touchedTag,
+                       int hashCode) throws Exception
+       {
+               while (entry != null) {
+                       if (entry.touchedTag < touchedTag && entry.hashCode == 
hashCode && entry.key.equals(key)) {
+                               entry.touchedTag = touchedTag;
+                               visitor.nextValue(entry.value);
+                       }
+                       entry = entry.next;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Comparator that defines a descending order on maps depending on 
their table capacity
+        * and number of elements.
+        */
+       static final class CapacityDescendingComparator implements 
Comparator<KeyMap<?, ?>> {
+               
+               static final CapacityDescendingComparator INSTANCE = new 
CapacityDescendingComparator();
+               
+               private CapacityDescendingComparator() {}
+
+
+               @Override
+               public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
+                       // this sorts descending
+                       int cmp = o2.getLog2TableCapacity() - 
o1.getLog2TableCapacity();
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+                       else {
+                               return o2.size() - o1.size();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A factory for lazy/on-demand instantiation of values.
+        *
+        * @param <V> The type created by the factory.
+        */
+       public static interface LazyFactory<V> {
+
+               /**
+                * The factory method; creates the value.
+                * @return The value.
+                */
+               V create();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A visitor for a traversal over the union of multiple hash maps. The 
visitor is
+        * called for each key in the union of the maps and all values 
associated with that key
+        * (one per map, but multiple across maps). 
+        * 
+        * @param <K> The type of the key.
+        * @param <V> The type of the value.
+        */
+       public static interface TraversalEvaluator<K, V> {
+
+               /**
+                * Called whenever the traversal starts with a new key.
+                * 
+                * @param key The key traversed.
+                * @throws Exception Method forwards all exceptions.
+                */
+               void startNewKey(K key) throws Exception;
+
+               /**
+                * Called for each value found for the current key.
+                * 
+                * @param value The next value.
+                * @throws Exception Method forwards all exceptions.
+                */
+               void nextValue(V value) throws Exception;
+
+               /**
+                * Called when the traversal for the current key is complete.
+                * 
+                * @throws Exception Method forwards all exceptions.
+                */
+               void keyDone() throws Exception;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java
new file mode 100644
index 0000000..63ed470
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the operators that implement the various window 
operations
+ * on data streams. 
+ */
+package org.apache.flink.streaming.runtime.operators.windows;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
index d93078b..5680810 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * An exception that is thrown by the stream verices when encountering an
+ * An exception that is thrown by the stream vertices when encountering an
  * illegal condition.
  */
 public class StreamTaskException extends RuntimeException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
new file mode 100644
index 0000000..a40ae3a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes that realize streaming tasks. These tasks are
+ * executable stream consumers and producers that are scheduled by the 
distributed
+ * dataflow runtime. Each task occupies one execution slot and is run with by 
an
+ * executing thread.
+ * <p>
+ * The tasks merely set up the distributed stream coordination and the 
checkpointing.
+ * Internally, the tasks create one or more operators, perform the stream 
transformations.
+ */
+package org.apache.flink.streaming.runtime.tasks;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
new file mode 100644
index 0000000..685939b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.TriggerTimer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
+
+       @SuppressWarnings("unchecked")
+       private final KeyedWindowFunction<String, String, String> mockFunction 
= mock(KeyedWindowFunction.class);
+
+       @SuppressWarnings("unchecked")
+       private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
+       
+       private final KeySelector<Integer, Integer> identitySelector = new 
KeySelector<Integer, Integer>() {
+               @Override
+               public Integer getKey(Integer value) {
+                       return value;
+               }
+       };
+       
+       private final KeyedWindowFunction<Integer, Integer, Integer> 
validatingIdentityFunction = 
+                       new KeyedWindowFunction<Integer, Integer, Integer>()
+       {
+               @Override
+               public void evaluate(Integer key, Iterable<Integer> values, 
Collector<Integer> out) {
+                       for (Integer val : values) {
+                               assertEquals(key, val);
+                               out.collect(val);
+                       }
+               }
+       };
+
+       // 
------------------------------------------------------------------------
+
+       @After
+       public void checkNoTriggerThreadsRunning() {
+               // make sure that all the threads we trigger are shut down
+               long deadline = System.currentTimeMillis() + 5000;
+               while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && 
System.currentTimeMillis() < deadline) {
+                       try {
+                               Thread.sleep(10);
+                       }
+                       catch (InterruptedException ignored) {}
+               }
+
+               assertTrue("Not all trigger threads where properly shut down",
+                               
TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0);
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testInvalidParameters() {
+               try {
+                       assertInvalidParameter(-1L, -1L);
+                       assertInvalidParameter(10000L, -1L);
+                       assertInvalidParameter(-1L, 1000L);
+                       assertInvalidParameter(1000L, 2000L);
+                       
+                       // actual internal slide is too low here:
+                       assertInvalidParameter(1000L, 999L);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testWindowSizeAndSlide() {
+               try {
+                       AbstractAlignedProcessingTimeWindowOperator<String, 
String, String> op;
+                       
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 
1000);
+                       assertEquals(5000, op.getWindowSize());
+                       assertEquals(1000, op.getWindowSlide());
+                       assertEquals(1000, op.getPaneSize());
+                       assertEquals(5, op.getNumPanesPerWindow());
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 
1000);
+                       assertEquals(1000, op.getWindowSize());
+                       assertEquals(1000, op.getWindowSlide());
+                       assertEquals(1000, op.getPaneSize());
+                       assertEquals(1, op.getNumPanesPerWindow());
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 
1000);
+                       assertEquals(1500, op.getWindowSize());
+                       assertEquals(1000, op.getWindowSlide());
+                       assertEquals(500, op.getPaneSize());
+                       assertEquals(3, op.getNumPanesPerWindow());
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 
1100);
+                       assertEquals(1200, op.getWindowSize());
+                       assertEquals(1100, op.getWindowSlide());
+                       assertEquals(100, op.getPaneSize());
+                       assertEquals(12, op.getNumPanesPerWindow());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testWindowTriggerTimeAlignment() {
+               try {
+                       @SuppressWarnings("unchecked")
+                       final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
+                       
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+                       
+                       AbstractAlignedProcessingTimeWindowOperator<String, 
String, String> op;
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 
1000);
+                       op.setup(mockOut, mockContext);
+                       op.open(new Configuration());
+                       assertTrue(op.getNextSlideTime() % 1000 == 0);
+                       assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+                       op.dispose();
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 
1000);
+                       op.setup(mockOut, mockContext);
+                       op.open(new Configuration());
+                       assertTrue(op.getNextSlideTime() % 1000 == 0);
+                       assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+                       op.dispose();
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 
1000);
+                       op.setup(mockOut, mockContext);
+                       op.open(new Configuration());
+                       assertTrue(op.getNextSlideTime() % 500 == 0);
+                       assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+                       op.dispose();
+
+                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 
1100);
+                       op.setup(mockOut, mockContext);
+                       op.open(new Configuration());
+                       assertTrue(op.getNextSlideTime() % 100 == 0);
+                       assertTrue(op.getNextEvaluationTime() % 1100 == 0);
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testTumblingWindow() {
+               try {
+                       final int windowSize = 50;
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(windowSize);
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(
+                                                       
validatingIdentityFunction, identitySelector, windowSize, windowSize);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+
+                       final int numElements = 1000;
+
+                       for (int i = 0; i < numElements; i++) {
+                               op.processElement(new StreamRecord<Integer>(i));
+                               Thread.sleep(1);
+                       }
+
+                       op.close();
+                       op.dispose();
+
+                       // get and verify the result
+                       List<Integer> result = out.getElements();
+                       assertEquals(numElements, result.size());
+
+                       Collections.sort(result);
+                       for (int i = 0; i < numElements; i++) {
+                               assertEquals(i, result.get(i).intValue());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSlidingWindow() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, 
identitySelector, 150, 50);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+
+                       final int numElements = 1000;
+
+                       for (int i = 0; i < numElements; i++) {
+                               op.processElement(new StreamRecord<Integer>(i));
+                               Thread.sleep(1);
+                       }
+
+                       op.close();
+                       op.dispose();
+
+                       // get and verify the result
+                       List<Integer> result = out.getElements();
+
+                       // if we kept this running, each element would be in 
the result three times (for each slide).
+                       // we are closing the window before the final panes are 
through three times, so we may have less
+                       // elements.
+                       if (result.size() < numElements || result.size() > 3 * 
numElements) {
+                               fail("Wrong number of results: " + 
result.size());
+                       }
+
+                       Collections.sort(result);
+                       int lastNum = -1;
+                       int lastCount = -1;
+                       
+                       for (int num : result) {
+                               if (num == lastNum) {
+                                       lastCount++;
+                                       assertTrue(lastCount <= 3);
+                               }
+                               else {
+                                       lastNum = num;
+                                       lastCount = 1;
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testTumblingWindowSingleElements() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, 
identitySelector, 50, 50);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+
+                       op.processElement(new StreamRecord<Integer>(1));
+                       op.processElement(new StreamRecord<Integer>(2));
+                       out.waitForNElements(2, 60000);
+
+                       op.processElement(new StreamRecord<Integer>(3));
+                       op.processElement(new StreamRecord<Integer>(4));
+                       op.processElement(new StreamRecord<Integer>(5));
+                       out.waitForNElements(5, 60000);
+
+                       op.processElement(new StreamRecord<Integer>(6));
+                       out.waitForNElements(6, 60000);
+                       
+                       List<Integer> result = out.getElements();
+                       assertEquals(6, result.size());
+
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+                       op.close();
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSlidingWindowSingleElements() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, 
identitySelector, 150, 50);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+                       
+                       op.processElement(new StreamRecord<Integer>(1));
+                       op.processElement(new StreamRecord<Integer>(2));
+
+                       // each element should end up in the output three times
+                       // wait until the elements have arrived 6 times in the 
output
+                       out.waitForNElements(6, 120000);
+                       
+                       List<Integer> result = out.getElements();
+                       assertEquals(6, result.size());
+                       
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+                       
+                       op.close();
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testEmitTrailingDataOnClose() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>();
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+                       
+                       // the operator has a window time that is so long that 
it will not fire in this test
+                       final long oneYear = 365L * 24 * 60 * 60 * 1000;
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op = 
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, 
identitySelector,
+                                                       oneYear, oneYear);
+                       
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+                       
+                       List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10);
+                       for (Integer i : data) {
+                               op.processElement(new StreamRecord<Integer>(i));
+                       }
+                       
+                       op.close();
+                       op.dispose();
+                       
+                       // get and verify the result
+                       List<Integer> result = out.getElements();
+                       Collections.sort(result);
+                       assertEquals(data, result);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPropagateExceptionsFromTrigger() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>();
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       KeyedWindowFunction<Integer, Integer, Integer> 
failingFunction = new FailingFunction(100);
+                       
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 
50, 50);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+
+                       try {
+                               int num = 0;
+                               while (num < Integer.MAX_VALUE) {
+                                       op.processElement(new 
StreamRecord<Integer>(num++));
+                                       Thread.sleep(1);
+                               }
+                               fail("This should really have failed with an 
exception quite a while ago...");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               
assertTrue(e.getCause().getMessage().contains("Artificial Test Exception"));
+                       }
+                       
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPropagateExceptionsFromClose() {
+               try {
+                       final CollectingOutput<Integer> out = new 
CollectingOutput<>();
+
+                       final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
+                       when(mockContext.getTaskName()).thenReturn("Test task 
name");
+
+                       KeyedWindowFunction<Integer, Integer, Integer> 
failingFunction = new FailingFunction(100);
+
+                       // the operator has a window time that is so long that 
it will not fire in this test
+                       final long hundredYears = 100L * 365 * 24 * 60 * 60 * 
1000;
+                       AbstractAlignedProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(
+                                                       failingFunction, 
identitySelector, hundredYears, hundredYears);
+
+                       op.setup(out, mockContext);
+                       op.open(new Configuration());
+
+                       for (int i = 0; i < 150; i++) {
+                               op.processElement(new StreamRecord<Integer>(i));
+                       }
+                       
+                       try {
+                               op.close();
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertTrue(
+                                               
e.getMessage().contains("Artificial Test Exception") ||
+                                               (e.getCause() != null && 
e.getCause().getMessage().contains("Artificial Test Exception")));
+                       }
+
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       private void assertInvalidParameter(long windowSize, long windowSlide) {
+               try {
+                       new AccumulatingProcessingTimeWindowOperator<String, 
String, String>(
+                                       mockFunction, mockKeySelector, 
windowSize, windowSlide);
+                       fail("This should fail with an 
IllegalArgumentException");
+               }
+               catch (IllegalArgumentException e) {
+                       // expected
+               }
+               catch (Exception e) {
+                       fail("Wrong exception. Expected 
IllegalArgumentException but found " + e.getClass().getSimpleName());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class FailingFunction implements 
KeyedWindowFunction<Integer, Integer, Integer> {
+
+               private final int failAfterElements;
+               
+               private int numElements;
+
+               FailingFunction(int failAfterElements) {
+                       this.failAfterElements = failAfterElements;
+               }
+
+               @Override
+               public void evaluate(Integer integer, Iterable<Integer> values, 
Collector<Integer> out) throws Exception {
+                       for (Integer i : values) {
+                               out.collect(i);
+                               numElements++;
+                               
+                               if (numElements >= failAfterElements) {
+                                       throw new Exception("Artificial Test 
Exception");
+                               }
+                       }
+               }
+       }
+}

Reply via email to