http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
new file mode 100644
index 0000000..cddcc42
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends 
WindowOperator<K, IN, OUT, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(EvictingWindowOperator.class);
+
+       private final Evictor<? super IN, ? super W> evictor;
+
+       public EvictingWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
+                       KeySelector<IN, K> keySelector,
+                       WindowBufferFactory<? super IN, ? extends 
EvictingWindowBuffer<IN>> windowBufferFactory,
+                       KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+                       Trigger<? super IN, ? super W> trigger,
+                       Evictor<? super IN, ? super W> evictor) {
+               super(windowAssigner, keySelector, windowBufferFactory, 
windowFunction, trigger);
+               this.evictor = evictor;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked, rawtypes")
+       protected void emitWindow(K key, W window, boolean purge) throws 
Exception {
+
+               timestampedCollector.setTimestamp(window.getEnd());
+
+               Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = 
windows.get(key);
+
+               if (keyWindows == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
+
+               Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+               if (purge) {
+                       bufferAndTrigger = keyWindows.remove(window);
+               } else {
+                       bufferAndTrigger = keyWindows.get(window);
+               }
+
+               if (bufferAndTrigger == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
+
+
+               EvictingWindowBuffer<IN> windowBuffer = 
(EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+
+               int toEvict = 0;
+               if (windowBuffer.size() > 0) {
+                       // need some type trickery here...
+                       toEvict = evictor.evict((Iterable) 
windowBuffer.getElements(), windowBuffer.size(), window);
+               }
+
+               windowBuffer.removeElements(toEvict);
+
+               userFunction.evaluate(key,
+                               window,
+                               bufferAndTrigger.f0.getUnpackedElements(),
+                               timestampedCollector);
+
+               if (keyWindows.isEmpty()) {
+                       windows.remove(key);
+               }
+       }
+
+       @Override
+       public EvictingWindowOperator<K, IN, OUT, W> 
enableSetProcessingTime(boolean setProcessingTime) {
+               super.enableSetProcessingTime(setProcessingTime);
+               return this;
+       }
+
+
+       // 
------------------------------------------------------------------------
+       // Getters for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       public Evictor<? super IN, ? super W> getEvictor() {
+               return evictor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
index b1ff7e2..880c85c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -18,15 +18,41 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
 import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 
 /**
  * This class implements the conversion from window policies to concrete 
operator
@@ -55,7 +81,7 @@ public class PolicyToOperator {
 
                                @SuppressWarnings("unchecked")
                                OneInputStreamOperator<IN, OUT> op = 
(OneInputStreamOperator<IN, OUT>)
-                                               new 
AggregatingProcessingTimeWindowOperator<KEY, IN>(
+                                               new 
AggregatingProcessingTimeWindowOperator<>(
                                                                reducer, 
keySelector, windowLength, windowSlide);
                                return op;
                        }
@@ -63,17 +89,147 @@ public class PolicyToOperator {
                                @SuppressWarnings("unchecked")
                                KeyedWindowFunction<IN, OUT, KEY, Window> wf = 
(KeyedWindowFunction<IN, OUT, KEY, Window>) function;
 
-                               return new 
AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+                               return new 
AccumulatingProcessingTimeWindowOperator<>(
                                                                wf, 
keySelector, windowLength, windowSlide);
                        }
                }
 
                // -- case 2: both policies are event time policies
                if (window instanceof EventTime && (slide == null || slide 
instanceof EventTime)) {
-                       // add event time implementation
+                       final long windowLength = ((EventTime) 
window).toMilliseconds();
+                       final long windowSlide = slide == null ? windowLength : 
((EventTime) slide).toMilliseconds();
+
+                       WindowAssigner<? super IN, TimeWindow> assigner;
+                       if (windowSlide == windowLength) {
+                               assigner = TumblingTimeWindows.of(windowLength);
+                       } else {
+                               assigner = SlidingTimeWindows.of(windowLength, 
windowSlide);
+                       }
+                       WindowBufferFactory<IN, ? extends WindowBuffer<IN>> 
windowBuffer;
+                       if (function instanceof ReduceFunction) {
+                               @SuppressWarnings("unchecked")
+                               ReduceFunction<IN> reducer = 
(ReduceFunction<IN>) SerializationUtils.clone(function);
+                               function = new ReduceWindowFunction<>(reducer);
+                               windowBuffer = new 
PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+                       } else {
+                               windowBuffer = new HeapWindowBuffer.Factory<>();
+                       }
+                       @SuppressWarnings("unchecked")
+                       KeyedWindowFunction<IN, OUT, KEY, TimeWindow> 
windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function;
+
+                       return new WindowOperator<>(
+                                       assigner,
+                                       keySelector,
+                                       windowBuffer,
+                                       windowFunction,
+                                       WatermarkTrigger.create());
                }
-               
-               throw new UnsupportedOperationException("The windowing 
mechanism does not yet support " + window.toString(slide));
+
+               // -- case 3: arbitrary trigger, no eviction
+               if (slide == null) {
+                       Trigger<? super IN, GlobalWindow> trigger = 
policyToTrigger(window);
+                       // we need to make them purging triggers because the 
trigger/eviction policy model
+                       // expects that the window is purged when no slide is 
used
+                       Trigger<? super IN, GlobalWindow> purgingTrigger = 
PurgingTrigger.of(trigger);
+
+                       WindowBufferFactory<IN, ? extends WindowBuffer<IN>> 
windowBuffer;
+                       if (function instanceof ReduceFunction) {
+                               @SuppressWarnings("unchecked")
+                               ReduceFunction<IN> reducer = 
(ReduceFunction<IN>) SerializationUtils.clone(function);
+                               function = new ReduceWindowFunction<>(reducer);
+                               windowBuffer = new 
PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+                       } else {
+                               windowBuffer = new HeapWindowBuffer.Factory<>();
+                       }
+
+                       if (!(function instanceof KeyedWindowFunction)) {
+                               throw new IllegalStateException("Windowing 
function is not of type EvaluateKeyedWindowFunction.");
+                       }
+                       @SuppressWarnings("unchecked")
+                       KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> 
windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+                       return new WindowOperator<>(
+                                       GlobalWindows.<IN>create(),
+                                       keySelector,
+                                       windowBuffer,
+                                       windowFunction,
+                                       purgingTrigger);
+               }
+
+               // -- case 4: arbitrary trigger, arbitrary eviction
+               Trigger<? super IN, GlobalWindow> trigger = 
policyToTrigger(slide);
+               Evictor<? super IN, GlobalWindow> evictor = 
policyToEvictor(window);
+
+               WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> 
windowBuffer = new HeapWindowBuffer.Factory<>();
+               if (function instanceof ReduceFunction) {
+                       @SuppressWarnings("unchecked")
+                       ReduceFunction<IN> reducer = (ReduceFunction<IN>) 
SerializationUtils.clone(function);
+                       function = new ReduceWindowFunction<>(reducer);
+               }
+
+               if (!(function instanceof KeyedWindowFunction)) {
+                       throw new IllegalStateException("Windowing function is 
not of type EvaluateKeyedWindowFunction.");
+               }
+
+               @SuppressWarnings("unchecked")
+               KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction 
= (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+               EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new 
EvictingWindowOperator<>(
+                               GlobalWindows.<IN>create(),
+                               keySelector,
+                               windowBuffer,
+                               windowFunction,
+                               trigger,
+                               evictor);
+
+               if (window instanceof ProcessingTime) {
+                       // special case, we need to instruct the window 
operator to store the processing time in
+                       // the elements so that the evictor can work on that
+                       op.enableSetProcessingTime(true);
+               }
+
+               return op;
+       }
+
+       private static <IN> Trigger<? super IN, GlobalWindow> 
policyToTrigger(WindowPolicy policy) {
+               if (policy instanceof EventTime) {
+                       EventTime eventTime = (EventTime) policy;
+                       return 
ContinuousWatermarkTrigger.of(eventTime.getSize());
+               } else if (policy instanceof ProcessingTime) {
+                       ProcessingTime processingTime = (ProcessingTime) policy;
+                       return 
ContinuousProcessingTimeTrigger.of(processingTime.getSize());
+               } else if (policy instanceof Count) {
+                       Count count = (Count) policy;
+                       return CountTrigger.of(count.getSize());
+               } else if (policy instanceof Delta) {
+                       @SuppressWarnings("unchecked,rawtypes")
+                       Delta<IN> delta = (Delta) policy;
+                       return DeltaTrigger.of(delta.getThreshold(), 
delta.getDeltaFunction());
+
+               }
+
+               throw new UnsupportedOperationException("Unsupported policy " + 
policy);
+       }
+
+       private static <IN> Evictor<? super IN, GlobalWindow> 
policyToEvictor(WindowPolicy policy) {
+               if (policy instanceof EventTime) {
+                       EventTime eventTime = (EventTime) policy;
+                       return TimeEvictor.of(eventTime.getSize());
+               } else if (policy instanceof ProcessingTime) {
+                       ProcessingTime processingTime = (ProcessingTime) policy;
+                       return TimeEvictor.of(processingTime.getSize());
+               } else if (policy instanceof Count) {
+                       Count count = (Count) policy;
+                       return CountEvictor.of(count.getSize());
+               } else if (policy instanceof Delta) {
+                       @SuppressWarnings("unchecked,rawtypes")
+                       Delta<IN> delta = (Delta) policy;
+                       return DeltaEvictor.of(delta.getThreshold(), 
delta.getDeltaFunction());
+
+               }
+
+
+               throw new UnsupportedOperationException("Unsupported policy " + 
policy);
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
new file mode 100644
index 0000000..cda4481
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -0,0 +1,320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class WindowOperator<K, IN, OUT, W extends Window>
+               extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, 
OUT, K, W>>
+               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperator.class);
+
+
+       private final WindowAssigner<? super IN, W> windowAssigner;
+       private final KeySelector<IN, K> keySelector;
+
+       private final Trigger<? super IN, ? super W> triggerTemplate;
+       private final WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory;
+
+       protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, 
TriggerContext>>> windows;
+
+       private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+       private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+
+       protected transient TimestampedCollector<OUT> timestampedCollector;
+
+       private boolean setProcessingTime = false;
+
+       private TypeSerializer<IN> inputSerializer;
+
+       public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+                       KeySelector<IN, K> keySelector,
+                       WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
+                       KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+                       Trigger<? super IN, ? super W> trigger) {
+
+               super(windowFunction);
+
+               this.windowAssigner = windowAssigner;
+               this.keySelector = keySelector;
+
+               this.windowBufferFactory = windowBufferFactory;
+               this.triggerTemplate = trigger;
+
+               setChainingStrategy(ChainingStrategy.ALWAYS);
+//             forceInputCopy();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               inputSerializer = (TypeSerializer<IN>) 
type.createSerializer(executionConfig);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               windows = Maps.newHashMap();
+               watermarkTimers = Maps.newHashMap();
+               processingTimeTimers = Maps.newHashMap();
+               timestampedCollector = new TimestampedCollector<>(output);
+
+               if (inputSerializer == null) {
+                       throw new IllegalStateException("Input serializer was 
not set.");
+               }
+
+               windowBufferFactory.setRuntimeContext(getRuntimeContext());
+               windowBufferFactory.open(parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               // emit the elements that we still keep
+               for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, 
TriggerContext>>> entry: windows.entrySet()) {
+                       K key = entry.getKey();
+                       Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> 
keyWindows = entry.getValue();
+                       for (W window: keyWindows.keySet()) {
+                               emitWindow(key, window, false);
+                       }
+               }
+               windows.clear();
+               windowBufferFactory.close();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               if (setProcessingTime) {
+                       element.replace(element.getValue(), 
System.currentTimeMillis());
+               }
+               Collection<W> elementWindows = 
windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+               K key = keySelector.getKey(element.getValue());
+
+               Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = 
windows.get(key);
+               if (keyWindows == null) {
+                       keyWindows = Maps.newHashMap();
+                       windows.put(key, keyWindows);
+               }
+
+               for (W window: elementWindows) {
+                       Tuple2<WindowBuffer<IN>, TriggerContext> 
bufferAndTrigger = keyWindows.get(window);
+                       if (bufferAndTrigger == null) {
+                               bufferAndTrigger = new Tuple2<>();
+                               bufferAndTrigger.f0 = 
windowBufferFactory.create();
+                               bufferAndTrigger.f1 = new TriggerContext(key, 
window, triggerTemplate.duplicate());
+                               keyWindows.put(window, bufferAndTrigger);
+                       }
+                       StreamRecord<IN> elementCopy = new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp());
+                       bufferAndTrigger.f0.storeElement(elementCopy);
+                       Trigger.TriggerResult triggerResult = 
bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), 
elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+                       processTriggerResult(triggerResult, key, window);
+               }
+       }
+
+       protected void emitWindow(K key, W window, boolean purge) throws 
Exception {
+               timestampedCollector.setTimestamp(window.getEnd());
+
+               Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = 
windows.get(key);
+
+               if (keyWindows == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
+
+               Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+               if (purge) {
+                       bufferAndTrigger = keyWindows.remove(window);
+               } else {
+                       bufferAndTrigger = keyWindows.get(window);
+               }
+
+               if (bufferAndTrigger == null) {
+                       LOG.debug("Window {} for key {} already gone.", window, 
key);
+                       return;
+               }
+
+
+               userFunction.evaluate(key,
+                               window,
+                               bufferAndTrigger.f0.getUnpackedElements(),
+                               timestampedCollector);
+
+               if (keyWindows.isEmpty()) {
+                       windows.remove(key);
+               }
+       }
+
+       private void processTriggerResult(Trigger.TriggerResult triggerResult, 
K key, W window) throws Exception {
+               switch (triggerResult) {
+                       case FIRE:
+                               emitWindow(key, window, false);
+                               break;
+
+                       case FIRE_AND_PURGE:
+                               emitWindow(key, window, true);
+                               break;
+
+                       case CONTINUE:
+                               // ingore
+               }
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               Set<Long> toRemove = Sets.newHashSet();
+
+               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
watermarkTimers.entrySet()) {
+                       if (triggers.getKey() <= mark.getTimestamp()) {
+                               for (TriggerContext trigger: 
triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(mark.getTimestamp(), trigger);
+                                       processTriggerResult(triggerResult, 
trigger.key, trigger.window);
+                               }
+                               toRemove.add(triggers.getKey());
+                       }
+               }
+
+               for (Long l: toRemove) {
+                       watermarkTimers.remove(l);
+               }
+               output.emitWatermark(mark);
+       }
+
+       @Override
+       public void trigger(long time) throws Exception {
+               Set<Long> toRemove = Sets.newHashSet();
+
+               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
processingTimeTimers.entrySet()) {
+                       if (triggers.getKey() < time) {
+                               for (TriggerContext trigger: 
triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(time, trigger);
+                                       processTriggerResult(triggerResult, 
trigger.key, trigger.window);
+                               }
+                               toRemove.add(triggers.getKey());
+                       }
+               }
+
+               for (Long l: toRemove) {
+                       processingTimeTimers.remove(l);
+               }
+       }
+
+       protected class TriggerContext implements Trigger.TriggerContext {
+               Trigger<? super IN, ? super W> trigger;
+               K key;
+               W window;
+
+               public TriggerContext(K key, W window, Trigger<? super IN, ? 
super W> trigger) {
+                       this.key = key;
+                       this.window = window;
+                       this.trigger = trigger;
+               }
+
+               @Override
+               public void registerProcessingTimeTimer(long time) {
+                       Set<TriggerContext> triggers = 
processingTimeTimers.get(time);
+                       if (triggers == null) {
+                               getRuntimeContext().registerTimer(time, 
WindowOperator.this);
+                               triggers = Sets.newHashSet();
+                               processingTimeTimers.put(time, triggers);
+                       }
+                       triggers.add(this);
+               }
+
+               @Override
+               public void registerWatermarkTimer(long time) {
+                       Set<TriggerContext> triggers = 
watermarkTimers.get(time);
+                       if (triggers == null) {
+                               triggers = Sets.newHashSet();
+                               watermarkTimers.put(time, triggers);
+                       }
+                       triggers.add(this);
+               }
+       }
+
+       /**
+        * When this flag is enabled the current processing time is set as the 
timestamp of elements
+        * upon arrival. This must be used, for example, when using the
+        * {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
+        * time semantics.
+        */
+       public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean 
setProcessingTime) {
+               this.setProcessingTime = setProcessingTime;
+               return this;
+       }
+
+       // 
------------------------------------------------------------------------
+       // Getters for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       public Trigger<? super IN, ? super W> getTriggerTemplate() {
+               return triggerTemplate;
+       }
+
+       @VisibleForTesting
+       public KeySelector<IN, K> getKeySelector() {
+               return keySelector;
+       }
+
+       @VisibleForTesting
+       public WindowAssigner<? super IN, W> getWindowAssigner() {
+               return windowAssigner;
+       }
+
+       @VisibleForTesting
+       public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> 
getWindowBufferFactory() {
+               return windowBufferFactory;
+       }
+
+       @VisibleForTesting
+       public boolean isSetProcessingTime() {
+               return setProcessingTime;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
new file mode 100644
index 0000000..50e392b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
+       public boolean removeElements(int count);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
new file mode 100644
index 0000000..092718a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayDeque;
+
+public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
+       private static final long serialVersionUID = 1L;
+
+       private ArrayDeque<StreamRecord<T>> elements;
+
+       protected HeapWindowBuffer() {
+               this.elements = new ArrayDeque<>();
+       }
+
+       @Override
+       public void storeElement(StreamRecord<T> element) {
+               elements.add(element);
+       }
+
+       @Override
+       public boolean removeElements(int count) {
+               // TODO determine if this can be done in a better way
+               for (int i = 0; i < count; i++) {
+                       elements.removeFirst();
+               }
+               return false;
+       }
+
+       @Override
+       public Iterable<StreamRecord<T>> getElements() {
+               return elements;
+       }
+
+       @Override
+       public Iterable<T> getUnpackedElements() {
+               return FluentIterable.from(elements).transform(new 
Function<StreamRecord<T>, T>() {
+                       @Override
+                       public T apply(StreamRecord<T> record) {
+                               return record.getValue();
+                       }
+               });
+       }
+
+       @Override
+       public int size() {
+               return elements.size();
+       }
+
+       public static class Factory<T> implements WindowBufferFactory<T, 
HeapWindowBuffer<T>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void setRuntimeContext(RuntimeContext ctx) {}
+
+               @Override
+               public void open(Configuration config) {}
+
+               @Override
+               public void close() {}
+
+               @Override
+               public HeapWindowBuffer<T> create() {
+                       return new HeapWindowBuffer<>();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
new file mode 100644
index 0000000..85f90b0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+       private transient StreamRecord<T> data;
+
+       protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> 
reduceFunction) {
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void storeElement(StreamRecord<T> element) throws Exception {
+               if (data == null) {
+                       data = new StreamRecord<>(element.getValue(), 
element.getTimestamp());
+               } else {
+                       data.replace(reduceFunction.reduce(data.getValue(), 
element.getValue()));
+               }
+       }
+
+       @Override
+       public Iterable<StreamRecord<T>> getElements() {
+               return Collections.singleton(data);
+       }
+
+       @Override
+       public Iterable<T> getUnpackedElements() {
+               return Collections.singleton(data.getValue());
+       }
+
+       @Override
+       public int size() {
+               return 1;
+       }
+
+       public static class Factory<T> implements WindowBufferFactory<T, 
PreAggregatingHeapWindowBuffer<T>> {
+               private static final long serialVersionUID = 1L;
+
+               private final ReduceFunction<T> reduceFunction;
+
+               public Factory(ReduceFunction<T> reduceFunction) {
+                       this.reduceFunction = reduceFunction;
+               }
+
+               @Override
+               public void setRuntimeContext(RuntimeContext ctx) {
+                       FunctionUtils.setFunctionRuntimeContext(reduceFunction, 
ctx);
+               }
+
+               @Override
+               public void open(Configuration config) throws Exception {
+                       FunctionUtils.openFunction(reduceFunction, config);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       FunctionUtils.closeFunction(reduceFunction);
+               }
+
+               @Override
+               public PreAggregatingHeapWindowBuffer<T> create() {
+                       return new 
PreAggregatingHeapWindowBuffer<T>(reduceFunction);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
new file mode 100644
index 0000000..8c891d5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+
+public interface WindowBuffer<T> extends Serializable {
+
+       public void storeElement(StreamRecord<T> element) throws Exception;
+
+       public Iterable<StreamRecord<T>> getElements();
+
+       public Iterable<T> getUnpackedElements();
+
+       public int size();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
new file mode 100644
index 0000000..4a7f6df
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends 
Serializable {
+       void setRuntimeContext(RuntimeContext ctx);
+       void open(Configuration config) throws Exception;
+       void close() throws Exception;
+       B create();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
new file mode 100644
index 0000000..3d9605e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EvictingWindowOperatorTest {
+
+       // For counting if close() is called the correct number of times on the 
SumReducer
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCountTrigger() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+
+               final int WINDOW_SIZE = 4;
+               final int WINDOW_SLIDE = 2;
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                               GlobalWindows.create(),
+                               new TupleKeySelector(),
+                               new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
+                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+                               CountTrigger.of(WINDOW_SLIDE),
+                               CountEvictor.of(WINDOW_SIZE));
+
+               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // The global window actually ignores these timestamps...
+
+               // add elements out-of-order
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1998));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+
+
+       }
+
+       // 
------------------------------------------------------------------------
+       //  UDFs
+       // 
------------------------------------------------------------------------
+
+       public static class SumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private boolean openCalled = false;
+
+               private  AtomicInteger closeCalled;
+
+               public SumReducer(AtomicInteger closeCalled) {
+                       this.closeCalled = closeCalled;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       openCalled = true;
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       closeCalled.incrementAndGet();
+               }
+
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
+                               Tuple2<String, Integer> value2) throws 
Exception {
+                       if (!openCalled) {
+                               Assert.fail("Open was not called");
+                       }
+                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static class ResultSortComparator implements Comparator<Object> 
{
+               @Override
+               public int compare(Object o1, Object o2) {
+                       if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
+                               return 0;
+                       } else {
+                               StreamRecord<Tuple2<String, Integer>> sr0 = 
(StreamRecord<Tuple2<String, Integer>>) o1;
+                               StreamRecord<Tuple2<String, Integer>> sr1 = 
(StreamRecord<Tuple2<String, Integer>>) o2;
+                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
+                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
+                               }
+                               int comparison = 
sr0.getValue().f0.compareTo(sr1.getValue().f0);
+                               if (comparison != 0) {
+                                       return comparison;
+                               } else {
+                                       return sr0.getValue().f1 - 
sr1.getValue().f1;
+                               }
+                       }
+               }
+       }
+
+       private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
+                       return value.f0;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
new file mode 100644
index 0000000..6f42514
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} 
instantiate
+ * the correct window operator.
+ */
+public class PolicyWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
+
+       /**
+        * These tests ensure that the fast aligned time windows operator is 
used if the
+        * conditions are right.
+        */
+       @Test
+       public void testFastTimeWindows() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(Time.of(1000, TimeUnit.MILLISECONDS), 
Time.of(100, TimeUnit.MILLISECONDS))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof 
AggregatingProcessingTimeWindowOperator);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(Time.of(1000, TimeUnit.MILLISECONDS))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
Window>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       Window window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof 
AccumulatingProcessingTimeWindowOperator);
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testNonEvicting() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(Count.of(200))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof WindowOperator);
+               WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
PurgingTrigger);
+               
Assert.assertTrue(((PurgingTrigger)winOperator1.getTriggerTemplate()).getNestedTrigger()
 instanceof CountTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
GlobalWindows);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(Delta.of(15.0, new 
DeltaFunction<Object>() {
+                                       @Override
+                                       public double getDelta(Object 
oldDataPoint, Object newDataPoint) {
+                                               return 0;
+                                       }
+                               }))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
Window>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       Window window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof WindowOperator);
+               WindowOperator winOperator2 = (WindowOperator) operator2;
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
PurgingTrigger);
+               
Assert.assertTrue(((PurgingTrigger)winOperator2.getTriggerTemplate()).getNestedTrigger()
 instanceof DeltaTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
GlobalWindows);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testEvicting() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(Time.of(1000, TimeUnit.MICROSECONDS), 
Count.of(100))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+               EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
+               // ensure that the operator sets the current processing time as 
timestamp
+               Assert.assertTrue(winOperator1.isSetProcessingTime());
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
GlobalWindows);
+               Assert.assertTrue(winOperator1.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(Count.of(1000), Delta.of(1.0, new 
DeltaFunction<Object>() {
+                                       @Override
+                                       public double getDelta(Object 
oldDataPoint, Object newDataPoint) {
+                                               return 0;
+                                       }
+                               }))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
Window>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       Window window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+               EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
DeltaTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
GlobalWindows);
+               Assert.assertTrue(winOperator2.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  UDFs
+       // 
------------------------------------------------------------------------
+
+       public static class DummyReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1, Tuple2<String, Integer> value2) throws Exception {
+                       return value1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
new file mode 100644
index 0000000..5078c8c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link 
org.apache.flink.streaming.api.datastream.KeyedTriggerWindowDataStream} 
instantiate
+ * the correct window operator.
+ */
+public class TriggerWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
+
+       /**
+        * These tests ensure that the fast aligned time windows operator is 
used if the
+        * conditions are right.
+        */
+       @Test
+       public void testFastTimeWindows() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingProcessingTimeWindows.of(1000, 
100))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof 
AggregatingProcessingTimeWindowOperator);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(SlidingProcessingTimeWindows.of(1000, 
100))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof 
AccumulatingProcessingTimeWindowOperator);
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testNonEvicting() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingProcessingTimeWindows.of(1000, 
100))
+                               .trigger(CountTrigger.of(100))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof WindowOperator);
+               WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(TumblingProcessingTimeWindows.of(1000))
+                               .trigger(CountTrigger.of(100))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof WindowOperator);
+               WindowOperator winOperator2 = (WindowOperator) operator2;
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testEvicting() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingProcessingTimeWindows.of(1000, 
100))
+                               .evictor(CountEvictor.of(100))
+                               .reduceWindow(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+               EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(TumblingProcessingTimeWindows.of(1000))
+                               .trigger(CountTrigger.of(100))
+                               .evictor(TimeEvictor.of(100))
+                               .mapWindow(new 
KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void evaluate(Tuple tuple,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+               EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  UDFs
+       // 
------------------------------------------------------------------------
+
+       public static class DummyReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1, Tuple2<String, Integer> value2) throws Exception {
+                       return value1;
+               }
+       }
+}

Reply via email to