[FLINK-2677] Add a general-purpose keyed-window operator

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd51c977
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd51c977
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd51c977

Branch: refs/heads/master
Commit: dd51c97741b336d3a11e319183537eef864a84fd
Parents: 3be2dc1
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Sep 25 12:27:35 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:09:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedDataStream.java         |  19 +
 .../KeyedTriggerWindowDataStream.java           | 255 +++++++++++
 .../windowing/ReduceWindowFunction.java         |  70 +++
 .../ReduceWindowFunctionWithWindow.java         |  71 +++
 .../windowing/RichKeyedWindowFunction.java      |  25 +
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +
 .../api/windowing/assigners/GlobalWindows.java  |  55 +++
 .../assigners/SlidingProcessingTimeWindows.java |  92 ++++
 .../windowing/assigners/SlidingTimeWindows.java |  81 ++++
 .../TumblingProcessingTimeWindows.java          |  67 +++
 .../assigners/TumblingTimeWindows.java          |  67 +++
 .../api/windowing/assigners/WindowAssigner.java |  32 ++
 .../api/windowing/evictors/CountEvictor.java    |  44 ++
 .../api/windowing/evictors/DeltaEvictor.java    |  58 +++
 .../api/windowing/evictors/Evictor.java         |  28 ++
 .../api/windowing/evictors/TimeEvictor.java     |  54 +++
 .../ContinuousProcessingTimeTrigger.java        |  79 ++++
 .../triggers/ContinuousWatermarkTrigger.java    |  65 +++
 .../api/windowing/triggers/CountTrigger.java    |  61 +++
 .../api/windowing/triggers/DeltaTrigger.java    |  66 +++
 .../triggers/ProcessingTimeTrigger.java         |  57 +++
 .../api/windowing/triggers/PurgingTrigger.java  |  76 +++
 .../api/windowing/triggers/Trigger.java         |  40 ++
 .../windowing/triggers/WatermarkTrigger.java    |  57 +++
 .../api/windowing/windows/GlobalWindow.java     |  65 +++
 .../api/windowing/windows/TimeWindow.java       |  75 +++
 .../streaming/api/windowing/windows/Window.java |  27 ++
 .../operators/BucketStreamSortOperator.java     |  93 ++++
 .../windowing/EvictingWindowOperator.java       | 115 +++++
 .../operators/windowing/PolicyToOperator.java   | 166 ++++++-
 .../operators/windowing/WindowOperator.java     | 320 +++++++++++++
 .../windowing/buffers/EvictingWindowBuffer.java |  22 +
 .../windowing/buffers/HeapWindowBuffer.java     |  88 ++++
 .../buffers/PreAggregatingHeapWindowBuffer.java |  91 ++++
 .../windowing/buffers/WindowBuffer.java         |  34 ++
 .../windowing/buffers/WindowBufferFactory.java  |  30 ++
 .../windowing/EvictingWindowOperatorTest.java   | 179 ++++++++
 .../windowing/PolicyWindowTranslationTest.java  | 216 +++++++++
 .../windowing/TriggerWindowTranslationTest.java | 201 ++++++++
 .../operators/windowing/WindowOperatorTest.java | 459 +++++++++++++++++++
 40 files changed, 3701 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 611953e..ce105e5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,7 +23,9 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
@@ -115,4 +117,21 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> 
{
        public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, 
WindowPolicy slide) {
                return new KeyedWindowDataStream<T, KEY>(this, window, slide);
        }
+
+       /**
+        * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, 
which evaluates windows
+        * over a key grouped stream. Elements are put into windows by a {@link 
WindowAssigner}. The
+        * grouping of elements is done both by key and by window.
+        *
+        * <p>
+        * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} 
can be defined to specify
+        * when windows are evaluated. However, {@code WindowAssigners} have a 
default {@code Trigger}
+        * that is used if a {@code Trigger} is not specified.
+        *
+        * @param assigner The {@code WindowAssigner} that assigns elements to 
windows.
+        * @return The trigger windows data stream.
+        */
+       public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> 
window(WindowAssigner<? super T, W> assigner) {
+               return new KeyedTriggerWindowDataStream<T, KEY, W>(this, 
assigner);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
new file mode 100644
index 0000000..5b39775
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@code KeyedTriggerWindowDataStream} represents a data stream where 
elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based 
on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. 
Window emission
+ * is triggered based on a {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning 
windows can trigger at
+ * different points for each key.
+ *
+ * <p>
+ * If an {@link Evictor} is specified it will be used to evict elements from 
the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual 
evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code KeyedTriggerWindowDataStream} is purely and API 
construct, during runtime
+ * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the
+ * {@code KeyedDataStream} and the operation over the window into one single 
operation.
+ * 
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns the elements to.
+ */
+public class KeyedTriggerWindowDataStream<T, K, W extends Window> {
+
+       /** The keyed data stream that is windowed by this stream */
+       private final KeyedDataStream<T, K> input;
+
+       /** The window assigner */
+       private final WindowAssigner<? super T, W> windowAssigner;
+
+       /** The trigger that is used for window evaluation/emission. */
+       private Trigger<? super T, ? super W> trigger;
+
+       /** The evictor that is used for evicting elements before window 
evaluation. */
+       private Evictor<? super T, ? super W> evictor;
+
+
+       public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, 
WindowAssigner<? super T, W> windowAssigner) {
+               this.input = input;
+               this.windowAssigner = windowAssigner;
+               this.trigger = windowAssigner.getDefaultTrigger();
+       }
+
+       /**
+        * Sets the {@code Trigger} that should be used to trigger window 
emission.
+        */
+       public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, 
? super W> trigger) {
+               this.trigger = trigger;
+               return this;
+       }
+
+       /**
+        * Sets the {@code Evictor} that should be used to evict elements from 
a window before emission.
+        *
+        * <p>
+        * Note: When using an evictor window performance will degrade 
significantly, since
+        * pre-aggregation of window results cannot be used.
+        */
+       public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, 
? super W> evictor) {
+               this.evictor = evictor;
+               return this;
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  Operations on the keyed windows
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Applies a reduce function to the window. The window function is 
called for each evaluation
+        * of the window for each key individually. The output of the reduce 
function is interpreted
+        * as a regular non-windowed stream.
+        * <p>
+        * This window will try and pre-aggregate data as much as the window 
policies permit. For example,
+        * tumbling time windows can perfectly pre-aggregate the data, meaning 
that only one element per
+        * key is stored. Sliding time windows will pre-aggregate on the 
granularity of the slide interval,
+        * so a few elements are stored per key (one per slide interval).
+        * Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
+        * aggregation tree.
+        * 
+        * @param function The reduce function.
+        * @return The data stream that is the result of applying the reduce 
function to the window. 
+        */
+       public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "Reduce at " + callLocation;
+
+               DataStream<T> result = createFastTimeOperatorIfValid(function, 
input.getType(), udfName);
+               if (result != null) {
+                       return result;
+               }
+
+               String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, T> operator;
+
+               if (evictor != null) {
+                       operator = new EvictingWindowOperator<>(windowAssigner,
+                                       keySel,
+                                       new HeapWindowBuffer.Factory<T>(),
+                                       new ReduceWindowFunction<K, W, 
T>(function),
+                                       trigger,
+                                       evictor);
+
+               } else {
+                       // we need to copy because we need our own instance of 
the pre aggregator
+                       @SuppressWarnings("unchecked")
+                       ReduceFunction<T> functionCopy = (ReduceFunction<T>) 
SerializationUtils.clone(function);
+
+                       operator = new WindowOperator<>(windowAssigner,
+                                       keySel,
+                                       new 
PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+                                       new ReduceWindowFunction<K, W, 
T>(function),
+                                       trigger);
+               }
+
+               return input.transform(opName, input.getType(), operator);
+       }
+
+       /**
+        * Applies a window function to the window. The window function is 
called for each evaluation
+        * of the window for each key individually. The output of the window 
function is interpreted
+        * as a regular non-windowed stream.
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means of pre-aggregation.
+        * 
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> 
function) {
+               TypeInformation<T> inType = input.getType();
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               function, KeyedWindowFunction.class, true, 
true, inType, null, false);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "MapWindow at " + callLocation;
+
+               DataStream<R> result = createFastTimeOperatorIfValid(function, 
resultType, udfName);
+               if (result != null) {
+                       return result;
+               }
+
+
+               String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       operator = new EvictingWindowOperator<>(windowAssigner,
+                                       keySel,
+                                       new HeapWindowBuffer.Factory<T>(),
+                                       function,
+                                       trigger,
+                                       evictor);
+
+               } else {
+                       operator = new WindowOperator<>(windowAssigner,
+                                       keySel,
+                                       new HeapWindowBuffer.Factory<T>(),
+                                       function,
+                                       trigger);
+               }
+
+
+
+               return input.transform(opName, resultType, operator);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private <R> DataStream<R> createFastTimeOperatorIfValid(
+                       Function function,
+                       TypeInformation<R> resultType,
+                       String functionName) {
+
+               WindowPolicy windowPolicy = null;
+               WindowPolicy slidePolicy = null;
+
+               if (windowAssigner instanceof SlidingProcessingTimeWindows && 
trigger instanceof ProcessingTimeTrigger && evictor == null) {
+                       SlidingProcessingTimeWindows timeWindows = 
(SlidingProcessingTimeWindows) windowAssigner;
+                       windowPolicy = ProcessingTime.of(timeWindows.getSize(), 
TimeUnit.MILLISECONDS);
+                       slidePolicy = ProcessingTime.of(timeWindows.getSlide(), 
TimeUnit.MILLISECONDS);
+               } else if (windowAssigner instanceof 
TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && 
evictor == null) {
+                       TumblingProcessingTimeWindows timeWindows = 
(TumblingProcessingTimeWindows) windowAssigner;
+                       windowPolicy = ProcessingTime.of(timeWindows.getSize(), 
TimeUnit.MILLISECONDS);
+               }
+
+               if (windowPolicy == null) {
+                       return null;
+               }
+
+               String opName = windowPolicy.toString(slidePolicy) + " of " + 
functionName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator =
+                               
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, 
keySel);
+
+               return input.transform(opName, resultType, operator);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
new file mode 100644
index 0000000..1c9578a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunction<K, W extends Window, T> extends 
RichKeyedWindowFunction<T, T, K, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+
+       public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext ctx) {
+               super.setRuntimeContext(ctx);
+               FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               FunctionUtils.openFunction(reduceFunction, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               FunctionUtils.closeFunction(reduceFunction);
+       }
+
+       @Override
+       public void evaluate(K k, W window, Iterable<T> values, Collector<T> 
out) throws Exception {
+               T result = null;
+
+               for (T v: values) {
+                       if (result == null) {
+                               result = v;
+                       } else {
+                               result = reduceFunction.reduce(result, v);
+                       }
+               }
+
+               if (result != null) {
+                       out.collect(result);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
new file mode 100644
index 0000000..bceff82
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends 
RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+
+       public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) 
{
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext ctx) {
+               super.setRuntimeContext(ctx);
+               FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               FunctionUtils.openFunction(reduceFunction, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               FunctionUtils.closeFunction(reduceFunction);
+       }
+
+       @Override
+       public void evaluate(K k, W window, Iterable<T> values, 
Collector<Tuple2<W, T>> out) throws Exception {
+               T result = null;
+
+               for (T v: values) {
+                       if (result == null) {
+                               result = v;
+                       } else {
+                               result = reduceFunction.reduce(result, v);
+                       }
+               }
+
+               if (result != null) {
+                       out.collect(Tuple2.of(window, result));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
new file mode 100644
index 0000000..90ccb40
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> 
extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> {
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cda5686..cfa6d93 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -217,6 +218,11 @@ public class StreamGraph extends StreamingPlan {
                        outputTypeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
                }
 
+               if (operatorObject instanceof InputTypeConfigurable) {
+                       InputTypeConfigurable inputTypeConfigurable = 
(InputTypeConfigurable) operatorObject;
+                       inputTypeConfigurable.setInputType(inTypeInfo, 
executionConfig);
+               }
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Vertex: {}", vertexID);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
new file mode 100644
index 0000000..391a6a4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private GlobalWindows() {}
+
+       @Override
+       public Collection<GlobalWindow> assignWindows(Object element, long 
timestamp) {
+               return Collections.singletonList(GlobalWindow.get());
+       }
+
+       @Override
+       public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+               return null;
+       }
+
+       @Override
+       public String toString() {
+               return "GlobalWindows()";
+       }
+
+       /**
+        * Creates a new {@code GlobalWindows} {@link WindowAssigner} that 
assigns
+        * all elements to the same {@link GlobalWindow}.
+        *
+        * @return The global window policy.
+        */
+       public static GlobalWindows create() {
+               return new GlobalWindows();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
new file mode 100644
index 0000000..a2d95c2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingProcessingTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private final long size;
+
+       private final long slide;
+
+       private transient List<TimeWindow> result;
+
+       private SlidingProcessingTimeWindows(long size, long slide) {
+               this.size = size;
+               this.slide = slide;
+               this.result = Lists.newArrayListWithCapacity((int) (size / 
slide));
+       }
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+               this.result = Lists.newArrayListWithCapacity((int) (size / 
slide));
+       }
+
+       @Override
+       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
+               result.clear();
+               long time = System.currentTimeMillis();
+               long lastStart = time - time % slide;
+               for (long start = lastStart;
+                       start > time - size;
+                       start -= slide) {
+                       result.add(new TimeWindow(start, size));
+               }
+               return result;
+       }
+
+       public long getSize() {
+               return size;
+       }
+
+       public long getSlide() {
+               return slide;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> getDefaultTrigger() {
+               return ProcessingTimeTrigger.create();
+       }
+
+       @Override
+       public String toString() {
+               return "SlidingProcessingTimeWindows(" + size + ", " + slide + 
")";
+       }
+
+       /**
+        * Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+        * elements to sliding time windows based on the current processing 
time.
+        *
+        * @param size The size of the generated windows.
+        * @param slide The slide interval of the generated windows.
+        * @return The time policy.
+        */
+       public static SlidingProcessingTimeWindows of(long size, long slide) {
+               return new SlidingProcessingTimeWindows(size, slide);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
new file mode 100644
index 0000000..cb5a7a1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private final long size;
+
+       private final long slide;
+
+       private SlidingTimeWindows(long size, long slide) {
+               this.size = size;
+               this.slide = slide;
+       }
+
+       @Override
+       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
+               List<TimeWindow> windows = new ArrayList<>((int) (size / 
slide));
+               long lastStart = timestamp - timestamp % slide;
+               for (long start = lastStart;
+                       start > timestamp - size;
+                       start -= slide) {
+                       windows.add(new TimeWindow(start, size));
+               }
+               return windows;
+       }
+
+       public long getSize() {
+               return size;
+       }
+
+       public long getSlide() {
+               return slide;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> getDefaultTrigger() {
+               return WatermarkTrigger.create();
+       }
+
+       @Override
+       public String toString() {
+               return "SlidingTimeWindows(" + size + ", " + slide + ")";
+       }
+
+       /**
+        * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that 
assigns
+        * elements to sliding time windows based on the element timestamp.
+        *
+        * @param size The size of the generated windows.
+        * @param slide The slide interval of the generated windows.
+        * @return The time policy.
+        */
+       public static SlidingTimeWindows of(long size, long slide) {
+               return new SlidingTimeWindows(size, slide);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
new file mode 100644
index 0000000..b1ef857
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingProcessingTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private long size;
+
+       private TumblingProcessingTimeWindows(long size) {
+               this.size = size;
+       }
+
+       @Override
+       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
+               long time = System.currentTimeMillis();
+               long start = time - (time % size);
+               return Collections.singletonList(new TimeWindow(start, size));
+       }
+
+       public long getSize() {
+               return size;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> getDefaultTrigger() {
+               return ProcessingTimeTrigger.create();
+       }
+
+       @Override
+       public String toString() {
+               return "TumblingProcessingTimeWindows(" + size + ")";
+       }
+
+       /**
+        * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+        * elements to time windows based on the current processing time.
+        *
+        * @param size The size of the generated windows.
+        * @return The time policy.
+        */
+       public static TumblingProcessingTimeWindows of(long size) {
+               return new TumblingProcessingTimeWindows(size);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
new file mode 100644
index 0000000..d19c97d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private long size;
+
+       private TumblingTimeWindows(long size) {
+               this.size = size;
+       }
+
+       @Override
+       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
+               long start = timestamp - (timestamp % size);
+               return Collections.singletonList(new TimeWindow(start, size));
+       }
+
+       public long getSize() {
+               return size;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> getDefaultTrigger() {
+               return WatermarkTrigger.create();
+       }
+
+       @Override
+       public String toString() {
+               return "TumblingTimeWindows(" + size + ")";
+       }
+
+       /**
+        * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} 
that assigns
+        * elements to time windows based on the element timestamp.
+        *
+        * @param size The size of the generated windows.
+        * @return The time policy.
+        */
+       public static TumblingTimeWindows of(long size) {
+               return new TumblingTimeWindows(size);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
new file mode 100644
index 0000000..5996426
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+import java.util.Collection;
+
+public abstract class WindowAssigner<T, W extends Window> implements 
Serializable {
+       private static final long serialVersionUID = 1L;
+
+       public abstract Collection<W> assignWindows(T element, long timestamp);
+
+       public abstract Trigger<T, W> getDefaultTrigger();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
new file mode 100644
index 0000000..04636ee
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class CountEvictor<W extends Window> implements Evictor<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final long maxCount;
+
+       private CountEvictor(long count) {
+               this.maxCount = count;
+       }
+
+       @Override
+       public int evict(Iterable<StreamRecord<Object>> elements, int size, W 
window) {
+               if (size > maxCount) {
+                       return (int) (size - maxCount);
+               } else {
+                       return 0;
+               }
+       }
+
+       public static <W extends Window> CountEvictor<W> of(long maxCount) {
+               return new CountEvictor<>(maxCount);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
new file mode 100644
index 0000000..c7872ce
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
+       private static final long serialVersionUID = 1L;
+
+       DeltaFunction<T> deltaFunction;
+       private double threshold;
+
+       private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
+               this.deltaFunction = deltaFunction;
+               this.threshold = threshold;
+       }
+
+       @Override
+       public int evict(Iterable<StreamRecord<T>> elements, int size, W 
window) {
+               StreamRecord<T> lastElement = Iterables.getLast(elements);
+               int toEvict = 0;
+               for (StreamRecord<T> element : elements) {
+                       if (deltaFunction.getDelta(element.getValue(), 
lastElement.getValue()) < this.threshold) {
+                               break;
+                       }
+                       toEvict++;
+               }
+
+               return toEvict;
+       }
+
+       @Override
+       public String toString() {
+               return "DeltaEvictor(" +  deltaFunction + ", " + threshold + 
")";
+       }
+
+       public static <T, W extends Window> DeltaEvictor<T, W> of(double 
threshold, DeltaFunction<T> deltaFunction) {
+               return new DeltaEvictor<>(threshold, deltaFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
new file mode 100644
index 0000000..db04ac4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import scala.Serializable;
+
+public interface Evictor<T, W extends Window> extends Serializable {
+
+       public abstract int evict(Iterable<StreamRecord<T>> elements, int size, 
W window);
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
new file mode 100644
index 0000000..450b132
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final long windowSize;
+
+       public TimeEvictor(long windowSize) {
+               this.windowSize = windowSize;
+       }
+
+       @Override
+       public int evict(Iterable<StreamRecord<Object>> elements, int size, W 
window) {
+               int toEvict = 0;
+               long currentTime = System.currentTimeMillis();
+               long evictCutoff = currentTime - windowSize;
+               for (StreamRecord<Object> record: elements) {
+                       if (record.getTimestamp() > evictCutoff) {
+                               break;
+                       }
+                       toEvict++;
+               }
+               return toEvict;
+       }
+
+       @Override
+       public String toString() {
+               return "TimeEvictor(" + windowSize + ")";
+       }
+
+       public static <W extends Window> TimeEvictor<W> of(long windowSize) {
+               return new TimeEvictor<>(windowSize);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
new file mode 100644
index 0000000..64850a2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousProcessingTimeTrigger<W extends Window> implements 
Trigger<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private long granularity;
+
+       private long nextFireTimestamp = 0;
+
+       private ContinuousProcessingTimeTrigger(long granularity) {
+               this.granularity = granularity;
+       }
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) {
+               long currentTime = System.currentTimeMillis();
+               if (nextFireTimestamp == 0) {
+                       long start = currentTime - (currentTime % granularity);
+                       nextFireTimestamp = start + granularity;
+
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+                       return TriggerResult.CONTINUE;
+               }
+               if (currentTime > nextFireTimestamp) {
+                       long start = currentTime - (currentTime % granularity);
+                       nextFireTimestamp = start + granularity;
+
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+
+                       return TriggerResult.FIRE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               // only fire if an element didn't already fire
+               long currentTime = System.currentTimeMillis();
+               if (currentTime > nextFireTimestamp) {
+                       long start = currentTime - (currentTime % granularity);
+                       nextFireTimestamp = start + granularity;
+                       return TriggerResult.FIRE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public Trigger<Object, W> duplicate() {
+               return new ContinuousProcessingTimeTrigger<>(granularity);
+       }
+
+       @Override
+       public String toString() {
+               return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+       }
+
+       public static <W extends Window> ContinuousProcessingTimeTrigger<W> 
of(long granularity) {
+               return new ContinuousProcessingTimeTrigger<>(granularity);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
new file mode 100644
index 0000000..b7f085a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousWatermarkTrigger<W extends Window> implements 
Trigger<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private long granularity;
+
+       private boolean first = true;
+
+       private ContinuousWatermarkTrigger(long granularity) {
+               this.granularity = granularity;
+       }
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) {
+               if (first) {
+                       long start = timestamp - (timestamp % granularity);
+                       long nextFireTimestamp = start + granularity;
+
+                       ctx.registerWatermarkTimer(nextFireTimestamp);
+                       first = false;
+                       return TriggerResult.CONTINUE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               ctx.registerWatermarkTimer(time + granularity);
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public Trigger<Object, W> duplicate() {
+               return new ContinuousWatermarkTrigger<>(granularity);
+       }
+
+       @Override
+       public String toString() {
+               return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+       }
+
+       public static <W extends Window> ContinuousWatermarkTrigger<W> of(long 
granularity) {
+               return new ContinuousWatermarkTrigger<>(granularity);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
new file mode 100644
index 0000000..a51fae6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class CountTrigger<W extends Window> implements Trigger<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private long maxCount;
+       private long count;
+
+       private CountTrigger(long maxCount) {
+               this.maxCount = maxCount;
+               count = 0;
+       }
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) {
+               count++;
+               if (count >= maxCount) {
+                       count = 0;
+                       return TriggerResult.FIRE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               return null;
+       }
+
+       @Override
+       public Trigger<Object, W> duplicate() {
+               return new CountTrigger<>(maxCount);
+       }
+
+       @Override
+       public String toString() {
+               return "CountTrigger(" +  maxCount + ")";
+       }
+
+       public static <W extends Window> CountTrigger<W> of(long maxCount) {
+               return new CountTrigger<>(maxCount);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
new file mode 100644
index 0000000..ecd7ed0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+       private static final long serialVersionUID = 1L;
+
+       DeltaFunction<T> deltaFunction;
+       private double threshold;
+       private transient T lastElement;
+
+       private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
+               this.deltaFunction = deltaFunction;
+               this.threshold = threshold;
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) {
+               if (lastElement == null) {
+                       lastElement = element;
+                       return TriggerResult.CONTINUE;
+               }
+               if (deltaFunction.getDelta(lastElement, element) > 
this.threshold) {
+                       lastElement = element;
+                       return TriggerResult.FIRE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               return null;
+       }
+
+       @Override
+       public Trigger<T, W> duplicate() {
+               return new DeltaTrigger<>(threshold, deltaFunction);
+       }
+
+       @Override
+       public String toString() {
+               return "DeltaTrigger(" +  deltaFunction + ", " + threshold + 
")";
+       }
+
+       public static <T, W extends Window> DeltaTrigger<T, W> of(double 
threshold, DeltaFunction<T> deltaFunction) {
+               return new DeltaTrigger<>(threshold, deltaFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
new file mode 100644
index 0000000..f693a67
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       boolean isFirst = true;
+
+       private ProcessingTimeTrigger() {}
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) {
+               if (isFirst) {
+                       ctx.registerProcessingTimeTimer(window.getEnd());
+                       isFirst = false;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               return TriggerResult.FIRE_AND_PURGE;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> duplicate() {
+               return new ProcessingTimeTrigger();
+       }
+
+       @Override
+       public String toString() {
+               return "ProcessingTimeTrigger()";
+       }
+
+       public static ProcessingTimeTrigger create() {
+               return new ProcessingTimeTrigger();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
new file mode 100644
index 0000000..88e22cd
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+
+       private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
+               this.nestedTrigger = nestedTrigger;
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) {
+               TriggerResult triggerResult = nestedTrigger.onElement(element, 
timestamp, window, ctx);
+               switch (triggerResult) {
+                       case FIRE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       case FIRE_AND_PURGE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       default:
+                               return TriggerResult.CONTINUE;
+               }
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
+               switch (triggerResult) {
+                       case FIRE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       case FIRE_AND_PURGE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       default:
+                               return TriggerResult.CONTINUE;
+               }
+       }
+
+       @Override
+       public Trigger<T, W> duplicate() {
+               return new PurgingTrigger<>(nestedTrigger.duplicate());
+       }
+
+       @Override
+       public String toString() {
+               return "PurgingTrigger(" + nestedTrigger.toString() + ")";
+       }
+
+       public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, 
W> nestedTrigger) {
+               return new PurgingTrigger<>(nestedTrigger);
+       }
+
+       @VisibleForTesting
+       public Trigger<T, W> getNestedTrigger() {
+               return nestedTrigger;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
new file mode 100644
index 0000000..b04aacf
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+public interface Trigger<T, W extends Window> extends Serializable {
+
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx);
+
+       public TriggerResult onTime(long time, TriggerContext ctx);
+
+       public Trigger<T, W> duplicate();
+
+       public static enum TriggerResult {
+               CONTINUE, FIRE_AND_PURGE, FIRE
+       }
+
+       public interface TriggerContext {
+               void registerProcessingTimeTimer(long time);
+
+               void registerWatermarkTimer(long time);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
new file mode 100644
index 0000000..6ba8890
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       boolean isFirst = true;
+
+       private WatermarkTrigger() {}
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) {
+               if (isFirst) {
+                       ctx.registerWatermarkTimer(window.maxTimestamp());
+                       isFirst = false;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onTime(long time, TriggerContext ctx) {
+               return TriggerResult.FIRE_AND_PURGE;
+       }
+
+       @Override
+       public Trigger<Object, TimeWindow> duplicate() {
+               return new WatermarkTrigger();
+       }
+
+       @Override
+       public String toString() {
+               return "WatermarkTrigger()";
+       }
+
+       public static WatermarkTrigger create() {
+               return new WatermarkTrigger();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
new file mode 100644
index 0000000..e0df19d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class GlobalWindow extends Window {
+
+       private static GlobalWindow INSTANCE = new GlobalWindow();
+
+       private GlobalWindow() { }
+
+       public static GlobalWindow get() {
+               return INSTANCE;
+       }
+
+       @Override
+       public long getStart() {
+               return Long.MIN_VALUE;
+       }
+
+       @Override
+       public long getEnd() {
+               return Long.MAX_VALUE;
+       }
+
+       @Override
+       public long maxTimestamp() {
+               return Long.MAX_VALUE;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               return true;
+       }
+
+       @Override
+       public int hashCode() {
+               return 0;
+       }
+
+       @Override
+       public String toString() {
+               return "GlobalWindow";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
new file mode 100644
index 0000000..20080c0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class TimeWindow extends Window {
+       long start;
+       long end;
+
+       public TimeWindow() {
+       }
+
+       public TimeWindow(long start, long size) {
+               this.start = start;
+               this.end = start + size - 1;
+       }
+
+       @Override
+       public long getStart() {
+               return start;
+       }
+
+       @Override
+       public long getEnd() {
+               return end;
+       }
+
+       @Override
+       public long maxTimestamp() {
+               return end;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               TimeWindow window = (TimeWindow) o;
+
+               return end == window.end && start == window.start;
+       }
+
+       @Override
+       public int hashCode() {
+               int result = (int) (start ^ (start >>> 32));
+               result = 31 * result + (int) (end ^ (end >>> 32));
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "TimeWindow{" +
+                               "start=" + start +
+                               ", end=" + end +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
new file mode 100644
index 0000000..4e22c32
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public abstract class Window {
+
+       public abstract long getStart();
+
+       public abstract long getEnd();
+
+       public abstract long maxTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
new file mode 100644
index 0000000..145ad25
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> 
implements OneInputStreamOperator<T, T> {
+       private static final long serialVersionUID = 1L;
+
+       private long granularity;
+
+       private transient Map<Long, List<StreamRecord<T>>> buckets;
+
+       public BucketStreamSortOperator(long granularity) {
+               this.granularity = granularity;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               buckets = Maps.newHashMap();
+
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void processElement(StreamRecord<T> record) throws Exception {
+               long bucketId = record.getTimestamp() - (record.getTimestamp() 
% granularity);
+               List<StreamRecord<T>> bucket = buckets.get(bucketId);
+               if (bucket == null) {
+                       bucket = Lists.newArrayList();
+                       buckets.put(bucketId, bucket);
+               }
+               bucket.add(record);
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % 
granularity);
+               Set<Long> toRemove = Sets.newHashSet();
+               for (Map.Entry<Long, List<StreamRecord<T>>> bucket: 
buckets.entrySet()) {
+                       if (bucket.getKey() < maxBucketId) {
+                               Collections.sort(bucket.getValue(), new 
Comparator<StreamRecord<T>>() {
+                                       @Override
+                                       public int compare(StreamRecord<T> o1, 
StreamRecord<T> o2) {
+                                               return (int) (o1.getTimestamp() 
- o2.getTimestamp());
+                                       }
+                               });
+                               for (StreamRecord<T> r: bucket.getValue()) {
+                                       output.collect(r);
+                               }
+                               toRemove.add(bucket.getKey());
+                       }
+               }
+
+               for (Long l: toRemove) {
+                       buckets.remove(l);
+               }
+
+               output.emitWatermark(mark);
+       }
+
+}

Reply via email to