Repository: flink
Updated Branches:
  refs/heads/master 884d3e2a4 -> 12bf7c1a0


[FLINK-4207] WindowOperator becomes very slow with allowed lateness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12bf7c1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12bf7c1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12bf7c1a

Branch: refs/heads/master
Commit: 12bf7c1a0b81d199085fe874c64763c51a93b3bf
Parents: 884d3e2
Author: kl0u <kklou...@gmail.com>
Authored: Mon Jul 18 11:37:06 2016 +0200
Committer: kl0u <kklou...@gmail.com>
Committed: Tue Jul 26 21:12:05 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java    |   2 +-
 .../streaming/state/RocksDBListState.java       |   3 +-
 .../flink/api/common/state/AppendingState.java  |  10 +-
 .../flink/runtime/state/GenericListState.java   |   7 +-
 .../state/filesystem/FsFoldingState.java        |   7 +-
 .../runtime/state/filesystem/FsListState.java   |  13 +-
 .../runtime/state/memory/MemFoldingState.java   |   7 +-
 .../runtime/state/memory/MemListState.java      |  13 +-
 .../runtime/state/StateBackendTestBase.java     |   8 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   2 +-
 .../windowing/EvictingWindowOperator.java       |  75 +++-
 .../operators/windowing/MergingWindowSet.java   |  14 +-
 .../operators/windowing/WindowOperator.java     |  74 +++-
 .../windowing/MergingWindowSetTest.java         |   7 +-
 .../operators/windowing/WindowOperatorTest.java | 438 +++++++++++++++++++
 .../sessionwindows/SessionWindowITCase.java     |   2 -
 16 files changed, 580 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 8ffe3a6..218fa2a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -93,7 +93,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
                        byte[] key = baos.toByteArray();
                        byte[] valueBytes = backend.db.get(columnFamily, key);
                        if (valueBytes == null) {
-                               return stateDesc.getDefaultValue();
+                               return null;
                        }
                        return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
                } catch (IOException|RocksDBException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index b13c5ae..ce3a48e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -32,7 +32,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -94,7 +93,7 @@ public class RocksDBListState<K, N, V>
                        byte[] valueBytes = backend.db.get(columnFamily, key);
 
                        if (valueBytes == null) {
-                               return Collections.emptyList();
+                               return null;
                        }
 
                        ByteArrayInputStream bais = new 
ByteArrayInputStream(valueBytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index 04dc784..8ea8364 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -46,8 +46,14 @@ public interface AppendingState<IN, OUT> extends State {
         * operator instance. If state partitioning is applied, the value 
returned
         * depends on the current operator input, as the operator maintains an
         * independent state for each partition.
-        * 
-        * @return The operator state value corresponding to the current input.
+        *
+        * <p>
+        *     <b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this 
method
+        *     should return {@code null}.
+        * </p>
+        *
+        * @return The operator state value corresponding to the current input 
or {@code null}
+        * if the state is empty.
         * 
         * @throws Exception Thrown if the system cannot access the state.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index 3414855..2e40898 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 
 /**
  * Generic implementation of {@link ListState} based on a wrapped {@link 
ValueState}.
@@ -82,11 +81,7 @@ public class GenericListState<K, N, T, Backend extends 
AbstractStateBackend, W e
 
        @Override
        public Iterable<T> get() throws Exception {
-               ArrayList<T> result = wrappedState.value();
-               if (result == null) {
-                       return Collections.emptyList();
-               }
-               return result;
+               return wrappedState.value();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
index bba6df5..90baf36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
@@ -88,11 +88,8 @@ public class FsFoldingState<K, N, T, ACC>
                if (currentNSState == null) {
                        currentNSState = state.get(currentNamespace);
                }
-               if (currentNSState != null) {
-                       ACC value = currentNSState.get(currentKey);
-                       return value != null ? value : 
stateDesc.getDefaultValue();
-               }
-               return stateDesc.getDefaultValue();
+               return currentNSState != null ?
+                       currentNSState.get(currentKey) : null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
index 1d5b5f8..46c9830 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
@@ -27,9 +27,7 @@ import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -85,15 +83,8 @@ public class FsListState<K, N, V>
                if (currentNSState == null) {
                        currentNSState = state.get(currentNamespace);
                }
-               if (currentNSState != null) {
-                       List<V> result = currentNSState.get(currentKey);
-                       if (result == null) {
-                               return Collections.emptyList();
-                       } else {
-                               return result;
-                       }
-               }
-               return Collections.emptyList();
+               return currentNSState != null ?
+                       currentNSState.get(currentKey) : null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
index 07b677b..9953a64 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
@@ -64,11 +64,8 @@ public class MemFoldingState<K, N, T, ACC>
                if (currentNSState == null) {
                        currentNSState = state.get(currentNamespace);
                }
-               if (currentNSState != null) {
-                       ACC value = currentNSState.get(currentKey);
-                       return value != null ? value : 
stateDesc.getDefaultValue();
-               }
-               return stateDesc.getDefaultValue();
+               return currentNSState != null ?
+                       currentNSState.get(currentKey) : null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
index d5e4dfd..97461d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
@@ -26,9 +26,7 @@ import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -56,15 +54,8 @@ public class MemListState<K, N, V>
                if (currentNSState == null) {
                        currentNSState = state.get(currentNamespace);
                }
-               if (currentNSState != null) {
-                       List<V> result = currentNSState.get(currentKey);
-                       if (result == null) {
-                               return Collections.emptyList();
-                       } else {
-                               return result;
-                       }
-               }
-               return Collections.emptyList();
+               return currentNSState != null ?
+                       currentNSState.get(currentKey) : null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 12cf112..80f1de3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -243,10 +243,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        Joiner joiner = Joiner.on(",");
                        // some modifications to the state
                        backend.setCurrentKey(1);
-                       assertEquals("", joiner.join(state.get()));
+                       assertEquals(null, state.get());
                        state.add("1");
                        backend.setCurrentKey(2);
-                       assertEquals("", joiner.join(state.get()));
+                       assertEquals(null, state.get());
                        state.add("2");
                        backend.setCurrentKey(1);
                        assertEquals("1", joiner.join(state.get()));
@@ -438,10 +438,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // some modifications to the state
                        backend.setCurrentKey(1);
-                       assertEquals("Fold-Initial:", state.get());
+                       assertEquals(null, state.get());
                        state.add(1);
                        backend.setCurrentKey(2);
-                       assertEquals("Fold-Initial:", state.get());
+                       assertEquals(null, state.get());
                        state.add(2);
                        backend.setCurrentKey(1);
                        assertEquals("Fold-Initial:,1", state.get());

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 85d0b52..8b30130 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -72,7 +72,7 @@ public class PurgingTrigger<T, W extends Window> extends 
Trigger<T, W> {
        @Override
        public TriggerResult onMerge(W window, OnMergeContext ctx) throws 
Exception {
                TriggerResult triggerResult = nestedTrigger.onMerge(window, 
ctx);
-               return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : 
triggerResult;
+               return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : 
triggerResult;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 3f2c6a3..15f716c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -132,12 +132,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                                // check if the window is already inactive
                                if (isLate(actualWindow)) {
-                                       LOG.info("Dropped element " + element + 
" for window " + actualWindow + " due to lateness.");
                                        
mergingWindows.retireWindow(actualWindow);
                                        continue;
                                }
 
                                W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
+                               if (stateWindow == null) {
+                                       throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
+                               }
                                ListState<StreamRecord<IN>> windowState = 
getPartitionedState(
                                        stateWindow, windowSerializer, 
windowStateDescriptor);
                                windowState.add(element);
@@ -149,7 +151,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                // on the (possibly merged) window
                                TriggerResult triggerResult = 
context.onElement(element);
                                TriggerResult combinedTriggerResult = 
TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
-                               fireOrContinue(combinedTriggerResult, 
actualWindow, windowState);
+
+                               if (combinedTriggerResult.isFire()) {
+                                       Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                                       if (contents == null) {
+                                               // if we have no state, there 
is nothing to do
+                                               continue;
+                                       }
+                                       fire(actualWindow, contents);
+                               }
 
                                if (combinedTriggerResult.isPurge()) {
                                        cleanup(actualWindow, windowState, 
mergingWindows);
@@ -163,7 +173,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                                // check if the window is already inactive
                                if (isLate(window)) {
-                                       LOG.info("Dropped element " + element + 
" for window " + window + " due to lateness.");
                                        continue;
                                }
 
@@ -175,7 +184,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                context.window = window;
 
                                TriggerResult triggerResult = 
context.onElement(element);
-                               fireOrContinue(triggerResult, window, 
windowState);
+
+                               if (triggerResult.isFire()) {
+                                       Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                                       if (contents == null) {
+                                               // if we have no state, there 
is nothing to do
+                                               continue;
+                                       }
+                                       fire(window, contents);
+                               }
 
                                if (triggerResult.isPurge()) {
                                        cleanup(window, windowState, null);
@@ -207,16 +224,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                if (windowAssigner instanceof 
MergingWindowAssigner) {
                                        mergingWindows = getMergingWindowSet();
                                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                                       if (stateWindow == null) {
+                                               // then the window is already 
purged and this is a cleanup
+                                               // timer set due to allowed 
lateness that has nothing to clean,
+                                               // so it is safe to just ignore
+                                               continue;
+                                       }
                                        windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                                } else {
                                        windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                                }
 
+                               Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                               if (contents == null) {
+                                       // if we have no state, there is 
nothing to do
+                                       continue;
+                               }
+
                                TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
-                               fireOrContinue(triggerResult, context.window, 
windowState);
+                               if (triggerResult.isFire()) {
+                                       fire(context.window, contents);
+                               }
 
-                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) 
{
-                                       cleanup(timer.window, windowState, 
mergingWindows);
+                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                                       cleanup(context.window, windowState, 
mergingWindows);
                                }
 
                        } else {
@@ -255,16 +286,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                if (windowAssigner instanceof 
MergingWindowAssigner) {
                                        mergingWindows = getMergingWindowSet();
                                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                                       if (stateWindow == null) {
+                                               // then the window is already 
purged and this is a cleanup
+                                               // timer set due to allowed 
lateness that has nothing to clean,
+                                               // so it is safe to just ignore
+                                               continue;
+                                       }
                                        windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                                } else {
                                        windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                                }
 
+                               Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                               if (contents == null) {
+                                       // if we have no state, there is 
nothing to do
+                                       continue;
+                               }
+
                                TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                               fireOrContinue(triggerResult, context.window, 
windowState);
+                               if (triggerResult.isFire()) {
+                                       fire(context.window, contents);
+                               }
 
-                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(timer.window, 
timer.timestamp))) {
-                                       cleanup(timer.window, windowState, 
mergingWindows);
+                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                                       cleanup(context.window, windowState, 
mergingWindows);
                                }
 
                        } else {
@@ -273,15 +318,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                } while (fire);
        }
 
-       private void fireOrContinue(TriggerResult triggerResult,
-                                                               W window,
-                                                               
ListState<StreamRecord<IN>> windowState) throws Exception {
-               if (!triggerResult.isFire()) {
-                       return;
-               }
-
+       private void fire(W window, Iterable<StreamRecord<IN>> contents) throws 
Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-               Iterable<StreamRecord<IN>> contents = windowState.get();
 
                // Work around type system restrictions...
                int toEvict = evictor.evict((Iterable) contents, 
Iterables.size(contents), context.window);
@@ -307,7 +345,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                        mergingWindows.retireWindow(window);
                }
                context.clear();
-               deleteCleanupTimer(window);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index d02a348..c806d2d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -80,8 +80,11 @@ public class MergingWindowSet<W extends Window> {
                this.windowAssigner = windowAssigner;
                windows = new HashMap<>();
 
-               for (Tuple2<W, W> window: state.get()) {
-                       windows.put(window.f0, window.f1);
+               Iterable<Tuple2<W, W>> windowState = state.get();
+               if (windowState != null) {
+                       for (Tuple2<W, W> window: windowState) {
+                               windows.put(window.f0, window.f1);
+                       }
                }
        }
 
@@ -100,12 +103,7 @@ public class MergingWindowSet<W extends Window> {
         * @param window The window for which to get the state window.
         */
        public W getStateWindow(W window) {
-               W result = windows.get(window);
-               if (result == null) {
-                       throw new IllegalStateException("Window " + window + " 
is not in in-flight window set.");
-               }
-
-               return result;
+               return windows.get(window);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index bb05d2b..2434843 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -334,12 +334,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                                // drop if the window is already late
                                if (isLate(actualWindow)) {
-                                       LOG.info("Dropped element " + element+ 
" for window " + actualWindow + " due to lateness.");
                                        
mergingWindows.retireWindow(actualWindow);
                                        continue;
                                }
 
                                W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
+                               if (stateWindow == null) {
+                                       throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
+                               }
+
                                AppendingState<IN, ACC> windowState = 
getPartitionedState(
                                        stateWindow, windowSerializer, 
windowStateDescriptor);
                                windowState.add(element.getValue());
@@ -351,7 +354,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                // on the (possibly merged) window
                                TriggerResult triggerResult = 
context.onElement(element);
                                TriggerResult combinedTriggerResult = 
TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
-                               fireOrContinue(combinedTriggerResult, 
actualWindow, windowState);
+
+                               if (combinedTriggerResult.isFire()) {
+                                       ACC contents = windowState.get();
+                                       if (contents == null) {
+                                               continue;
+                                       }
+                                       fire(actualWindow, contents);
+                               }
 
                                if (combinedTriggerResult.isPurge()) {
                                        cleanup(actualWindow, windowState, 
mergingWindows);
@@ -364,7 +374,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                                // drop if the window is already late
                                if (isLate(window)) {
-                                       LOG.info("Dropped element " + element + 
" for window " + window + " due to lateness.");
                                        continue;
                                }
 
@@ -376,7 +385,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                context.window = window;
 
                                TriggerResult triggerResult = 
context.onElement(element);
-                               fireOrContinue(triggerResult, window, 
windowState);
+
+                               if (triggerResult.isFire()) {
+                                       ACC contents = windowState.get();
+                                       if (contents == null) {
+                                               continue;
+                                       }
+                                       fire(window, contents);
+                               }
 
                                if (triggerResult.isPurge()) {
                                        cleanup(window, windowState, null);
@@ -408,16 +424,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                if (windowAssigner instanceof 
MergingWindowAssigner) {
                                        mergingWindows = getMergingWindowSet();
                                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                                       if (stateWindow == null) {
+                                               // then the window is already 
purged and this is a cleanup
+                                               // timer set due to allowed 
lateness that has nothing to clean,
+                                               // so it is safe to just ignore
+                                               continue;
+                                       }
                                        windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                                } else {
                                        windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                                }
 
+                               ACC contents = windowState.get();
+                               if (contents == null) {
+                                       // if we have no state, there is 
nothing to do
+                                       continue;
+                               }
+
                                TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
-                               fireOrContinue(triggerResult, context.window, 
windowState);
+                               if (triggerResult.isFire()) {
+                                       fire(context.window, contents);
+                               }
 
-                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) 
{
-                                       cleanup(timer.window, windowState, 
mergingWindows);
+                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                                       cleanup(context.window, windowState, 
mergingWindows);
                                }
 
                        } else {
@@ -456,16 +486,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                if (windowAssigner instanceof 
MergingWindowAssigner) {
                                        mergingWindows = getMergingWindowSet();
                                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                                       if (stateWindow == null) {
+                                               // then the window is already 
purged and this is a cleanup
+                                               // timer set due to allowed 
lateness that has nothing to clean,
+                                               // so it is safe to just ignore
+                                               continue;
+                                       }
                                        windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                                } else {
                                        windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                                }
 
+                               ACC contents = windowState.get();
+                               if (contents == null) {
+                                       // if we have no state, there is 
nothing to do
+                                       continue;
+                               }
+
                                TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                               fireOrContinue(triggerResult, context.window, 
windowState);
+                               if (triggerResult.isFire()) {
+                                       fire(context.window, contents);
+                               }
 
-                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(timer.window, 
timer.timestamp))) {
-                                       cleanup(timer.window, windowState, 
mergingWindows);
+                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                                       cleanup(context.window, windowState, 
mergingWindows);
                                }
 
                        } else {
@@ -487,7 +531,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        mergingWindows.retireWindow(window);
                }
                context.clear();
-               deleteCleanupTimer(window);
        }
 
        /**
@@ -495,15 +538,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * The caller must ensure that the correct key is set in the state 
backend and the context object.
         */
        @SuppressWarnings("unchecked")
-       private void fireOrContinue(TriggerResult triggerResult,
-                                                               W window,
-                                                               
AppendingState<IN, ACC> windowState) throws Exception {
-               if (!triggerResult.isFire()) {
-                       return;
-               }
-
+       private void fire(W window, ACC contents) throws Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-               ACC contents = windowState.get();
                userFunction.apply(context.key, context.window, contents, 
timestampedCollector);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index cf90f8a..939f13f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -133,12 +133,7 @@ public class MergingWindowSetTest {
                // retire the first batch of windows
                windowSet.retireWindow(new TimeWindow(0, 6));
 
-               try {
-                       windowSet.getStateWindow(new TimeWindow(0, 6));
-                       fail("Expected exception");
-               } catch (IllegalStateException e) {
-                       //ignore
-               }
+               assertTrue(windowSet.getStateWindow(new TimeWindow(0, 6)) == 
null);
 
                assertTrue(windowSet.getStateWindow(new TimeWindow(10, 
15)).equals(new TimeWindow(11, 14)));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 90bd3f2..62266c4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -51,6 +54,8 @@ import 
org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -1952,10 +1957,379 @@ public class WindowOperatorTest {
                testHarness.close();
        }
 
+       @Test
+       public void testCleanupTimerWithEmptyListStateForTumblingWindows2() 
throws Exception {
+               final int WINDOW_SIZE = 2;
+               final long LATENESS = 100;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                       new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator =
+                       new WindowOperator<>(
+                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               windowStateDesc,
+                               new InternalIterableWindowFunction<>(new 
PassThroughFunction2()),
+                               new EventTimeTriggerAccumGC(LATENESS),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               // normal element
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(1599));
+               testHarness.processWatermark(new Watermark(1999));
+               testHarness.processWatermark(new Watermark(2100));
+               testHarness.processWatermark(new Watermark(5000));
+
+               expected.add(new Watermark(1599));
+               expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
+               expected.add(new Watermark(1999)); // here it fires and purges
+               expected.add(new Watermark(2100)); // here is the cleanup timer
+               expected.add(new Watermark(5000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       private class PassThroughFunction2 implements 
WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void apply(String k, TimeWindow window, 
Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws 
Exception {
+                       out.collect("GOT: " + Joiner.on(",").join(input));
+               }
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyListStateForTumblingWindows() 
throws Exception {
+               final int WINDOW_SIZE = 2;
+               final long LATENESS = 1;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                       new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator =
+                       new WindowOperator<>(
+                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               windowStateDesc,
+                               new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               // normal element
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(1599));
+               testHarness.processWatermark(new Watermark(1999));
+               testHarness.processWatermark(new Watermark(2000));
+               testHarness.processWatermark(new Watermark(5000));
+
+               expected.add(new Watermark(1599));
+               expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+               expected.add(new Watermark(1999)); // here it fires and purges
+               expected.add(new Watermark(2000)); // here is the cleanup timer
+               expected.add(new Watermark(5000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() 
throws Exception {
+               final int WINDOW_SIZE = 2;
+               final long LATENESS = 1;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+                       new WindowOperator<>(
+                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               // normal element
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(1599));
+               testHarness.processWatermark(new Watermark(1999));
+               testHarness.processWatermark(new Watermark(2000));
+               testHarness.processWatermark(new Watermark(5000));
+
+               expected.add(new Watermark(1599));
+               expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+               expected.add(new Watermark(1999)); // here it fires and purges
+               expected.add(new Watermark(2000)); // here is the cleanup timer
+               expected.add(new Watermark(5000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() 
throws Exception {
+               final int WINDOW_SIZE = 2;
+               final long LATENESS = 1;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, 
Integer>> windowStateDesc =
+                       new FoldingStateDescriptor<>(
+                               "window-contents",
+                               new Tuple2<>((String) null, 0),
+                               new FoldFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+                                       @Override
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws 
Exception {
+                                               return new Tuple2<>(value.f0, 
accumulator.f1 + value.f1);
+                                       }
+                               },
+                               inputType);
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+                       new WindowOperator<>(
+                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               windowStateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughFunction()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               // normal element
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(1599));
+               testHarness.processWatermark(new Watermark(1999));
+               testHarness.processWatermark(new Watermark(2000));
+               testHarness.processWatermark(new Watermark(5000));
+
+               expected.add(new Watermark(1599));
+               expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+               expected.add(new Watermark(1999)); // here it fires and purges
+               expected.add(new Watermark(2000)); // here is the cleanup timer
+               expected.add(new Watermark(5000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyListStateForSessionWindows() 
throws Exception {
+               final int GAP_SIZE = 3;
+               final long LATENESS = 10;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                       new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator =
+                       new WindowOperator<>(
+                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               windowStateDesc,
+                               new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(4998));
+
+               expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+               expected.add(new Watermark(4998));
+
+               testHarness.processWatermark(new Watermark(14600));
+               expected.add(new Watermark(14600));
+
+               ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyReduceStateForSessionWindows() 
throws Exception {
+
+               final int GAP_SIZE = 3;
+               final long LATENESS = 10;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
+                       new WindowOperator<>(
+                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(4998));
+
+               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 
4000L), 3999));
+               expected.add(new Watermark(4998));
+
+               testHarness.processWatermark(new Watermark(14600));
+               expected.add(new Watermark(14600));
+
+               ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() 
throws Exception {
+               final int GAP_SIZE = 3;
+               final long LATENESS = 10;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, 
Integer>> windowStateDesc =
+                       new FoldingStateDescriptor<>(
+                               "window-contents",
+                               new Tuple2<>((String) null, 0),
+                               new FoldFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+                                       @Override
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws 
Exception {
+                                               return new Tuple2<>(value.f0, 
accumulator.f1 + value.f1);
+                                       }
+                               },
+                               inputType);
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+                       new WindowOperator<>(
+                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               windowStateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughFunction()),
+                               EventTimeTrigger.create(),
+                               LATENESS);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
+               testHarness.processWatermark(new Watermark(4998));
+
+               expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+               expected.add(new Watermark(4998));
+
+               testHarness.processWatermark(new Watermark(14600));
+               expected.add(new Watermark(14600));
+
+               ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple2ResultSortComparator());
+               testHarness.close();
+       }
+
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------
 
+       private class PassThroughFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void apply(String k, TimeWindow window, 
Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> 
out) throws Exception {
+                       for (Tuple2<String, Integer> in: input) {
+                               out.collect(in);
+                       }
+               }
+       }
+
        public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
                private static final long serialVersionUID = 1L;
                @Override
@@ -2112,4 +2486,68 @@ public class WindowOperatorTest {
                        return Collections.singletonList(new 
TimeWindow(timestamp, timestamp + sessionTimeout));
                }
        }
+
+       /**
+        * A trigger that fires at the end of the window but does not
+        * purge the state of the fired window. This is to test the state
+        * garbage collection mechanism.
+        */
+       public class EventTimeTriggerAccumGC extends Trigger<Object, 
TimeWindow> {
+               private static final long serialVersionUID = 1L;
+
+               private long cleanupTime;
+
+               private EventTimeTriggerAccumGC() {
+                       cleanupTime = 0L;
+               }
+
+               public EventTimeTriggerAccumGC(long cleanupTime) {
+                       this.cleanupTime = cleanupTime;
+               }
+
+               @Override
+               public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) throws Exception {
+                       if (window.maxTimestamp() <= ctx.getCurrentWatermark()) 
{
+                               // if the watermark is already past the window 
fire immediately
+                               return TriggerResult.FIRE;
+                       } else {
+                               
ctx.registerEventTimeTimer(window.maxTimestamp());
+                               return TriggerResult.CONTINUE;
+                       }
+               }
+
+               @Override
+               public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
+                       return time == window.maxTimestamp() || time == 
window.maxTimestamp() + cleanupTime ?
+                               TriggerResult.FIRE_AND_PURGE :
+                               TriggerResult.CONTINUE;
+               }
+
+               @Override
+               public TriggerResult onProcessingTime(long time, TimeWindow 
window, TriggerContext ctx) throws Exception {
+                       return TriggerResult.CONTINUE;
+               }
+
+               @Override
+               public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {
+                       ctx.deleteEventTimeTimer(window.maxTimestamp());
+               }
+
+               @Override
+               public boolean canMerge() {
+                       return true;
+               }
+
+               @Override
+               public TriggerResult onMerge(TimeWindow window,
+                                                                        
OnMergeContext ctx) {
+                       ctx.registerEventTimeTimer(window.maxTimestamp());
+                       return TriggerResult.CONTINUE;
+               }
+
+               @Override
+               public String toString() {
+                       return "EventTimeTrigger()";
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
index eb137aa..9b4855f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
@@ -22,9 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

Reply via email to