http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 628d663..9b8af58 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.commons.math3.util.ArithmeticUtils;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
@@ -42,6 +42,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAppendingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -107,22 +110,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        protected final WindowAssigner<? super IN, W> windowAssigner;
 
-       protected final KeySelector<IN, K> keySelector;
-
-       protected final Trigger<? super IN, ? super W> trigger;
+       private final KeySelector<IN, K> keySelector;
 
-       protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor;
+       private final Trigger<? super IN, ? super W> trigger;
 
-       protected final ListStateDescriptor<Tuple2<W, W>> 
mergingWindowsDescriptor;
+       private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor;
 
-       /**
-        * For serializing the key in checkpoints.
-        */
+       /** For serializing the key in checkpoints. */
        protected final TypeSerializer<K> keySerializer;
 
-       /**
-        * For serializing the window in checkpoints.
-        */
+       /** For serializing the window in checkpoints. */
        protected final TypeSerializer<W> windowSerializer;
 
        /**
@@ -133,15 +130,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         *         {@code window.maxTimestamp + allowedLateness} landmark.
         * </ul>
         */
-       protected final long allowedLateness;
+       private final long allowedLateness;
 
        // 
------------------------------------------------------------------------
        // State that is not checkpointed
        // 
------------------------------------------------------------------------
 
-       /**
-        * This is given to the {@code InternalWindowFunction} for emitting 
elements with a given timestamp.
-        */
+       /** The state in which the window contents is stored. Each window is a 
namespace */
+       private transient InternalAppendingState<W, IN, ACC> windowState;
+
+       /** The {@link #windowState}, typed to merging state for merging 
windows.
+        * Null if the window state is not mergeable */
+       private transient InternalMergingState<W, IN, ACC> windowMergingState;
+
+       /** The state that holds the merging window metadata (the sets that 
describe what is merged) */
+       private transient InternalListState<VoidNamespace, Tuple2<W, W>> 
mergingSetsState;
+
+       /** This is given to the {@code InternalWindowFunction} for emitting 
elements with a given timestamp. */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
        protected transient Context context = new Context(null, null);
@@ -234,14 +239,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                this.allowedLateness = allowedLateness;
                this.legacyWindowOperatorType = legacyWindowOperatorType;
 
-               if (windowAssigner instanceof MergingWindowAssigner) {
-                       @SuppressWarnings({"unchecked", "rawtypes"})
-                       TupleSerializer<Tuple2<W, W>> tupleSerializer = new 
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, 
windowSerializer} );
-                       mergingWindowsDescriptor = new 
ListStateDescriptor<>("merging-window-set", tupleSerializer);
-               } else {
-                       mergingWindowsDescriptor = null;
-               }
-
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
@@ -263,6 +260,43 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        }
                };
 
+               // create (or restore) the state that hold the actual window 
contents
+               // NOTE - the state may be null in the case of the overriding 
evicting window operator
+               if (windowStateDescriptor != null) {
+                       windowState = (InternalAppendingState<W, IN, ACC>) 
getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
+               }
+
+               // create the typed and helper states for merging windows
+               if (windowAssigner instanceof MergingWindowAssigner) {
+
+                       // store a typed reference for the state of merging 
windows - sanity check
+                       if (windowState instanceof InternalMergingState) {
+                               windowMergingState = (InternalMergingState<W, 
IN, ACC>) windowState;
+                       }
+                       // TODO this sanity check should be here, but is 
prevented by an incorrect test (pending validation)
+                       // TODO see 
WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
+                       // TODO activate the sanity check once resolved
+//                     else if (windowState != null) {
+//                             throw new IllegalStateException(
+//                                             "The window uses a merging 
assigner, but the window state is not mergeable.");
+//                     }
+
+                       @SuppressWarnings("unchecked")
+                       final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, 
W>>) (Class<?>) Tuple2.class;
+
+                       final TupleSerializer<Tuple2<W, W>> tupleSerializer = 
new TupleSerializer<>(
+                                       typedTuple,
+                                       new TypeSerializer[] {windowSerializer, 
windowSerializer} );
+
+                       final ListStateDescriptor<Tuple2<W, W>> 
mergingSetsStateDescriptor =
+                                       new 
ListStateDescriptor<>("merging-window-set", tupleSerializer);
+
+                       // get the state that stores the merging sets
+                       mergingSetsState = (InternalListState<VoidNamespace, 
Tuple2<W, W>>) 
+                                       
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, 
mergingSetsStateDescriptor);
+                       
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
+               }
+
                registerRestoredLegacyStateState();
        }
 
@@ -283,12 +317,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void processElement(StreamRecord<IN> element) throws Exception {
-               Collection<W> elementWindows = windowAssigner.assignWindows(
+               final Collection<W> elementWindows = 
windowAssigner.assignWindows(
                        element.getValue(), element.getTimestamp(), 
windowAssignerContext);
-
-               final K key = (K) getKeyedStateBackend().getCurrentKey();
+               
+               final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        MergingWindowSet<W> mergingWindows = 
getMergingWindowSet();
@@ -315,11 +348,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                                }
 
                                                // merge the merged state 
windows into the newly resulting state window
-                                               
getKeyedStateBackend().mergePartitionedStates(
-                                                       stateWindowResult,
-                                                       mergedStateWindows,
-                                                       windowSerializer,
-                                                       (StateDescriptor<? 
extends MergingState<?,?>, ?>) windowStateDescriptor);
+                                               
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                                        }
                                });
 
@@ -334,8 +363,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
                                }
 
-                               AppendingState<IN, ACC> windowState =
-                                               
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+                               windowState.setCurrentNamespace(stateWindow);
                                windowState.add(element.getValue());
 
                                context.key = key;
@@ -368,8 +396,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        continue;
                                }
 
-                               AppendingState<IN, ACC> windowState =
-                                               getPartitionedState(window, 
windowSerializer, windowStateDescriptor);
+                               windowState.setCurrentNamespace(window);
                                windowState.add(element.getValue());
 
                                context.key = key;
@@ -399,8 +426,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                context.key = timer.getKey();
                context.window = timer.getNamespace();
 
-               AppendingState<IN, ACC> windowState;
-               MergingWindowSet<W> mergingWindows = null;
+               MergingWindowSet<W> mergingWindows;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
@@ -411,12 +437,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                // so it is safe to just ignore
                                return;
                        }
-                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       
+                       windowState.setCurrentNamespace(stateWindow);
                } else {
-                       windowState = getPartitionedState(
-                                       context.window,
-                                       windowSerializer,
-                                       windowStateDescriptor);
+                       windowState.setCurrentNamespace(context.window);
+                       mergingWindows = null;
                }
 
                ACC contents = windowState.get();
@@ -440,8 +465,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                context.key = timer.getKey();
                context.window = timer.getNamespace();
 
-               AppendingState<IN, ACC> windowState;
-               MergingWindowSet<W> mergingWindows = null;
+               MergingWindowSet<W> mergingWindows;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
@@ -452,9 +476,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                // so it is safe to just ignore
                                return;
                        }
-                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       windowState.setCurrentNamespace(stateWindow);
                } else {
-                       windowState = getPartitionedState(context.window, 
windowSerializer, windowStateDescriptor);
+                       windowState.setCurrentNamespace(context.window);
+                       mergingWindows = null;
                }
 
                ACC contents = windowState.get();
@@ -507,13 +532,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * {@link MergingWindowSet#persist()}.
         */
        protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
-               ListState<Tuple2<W, W>> mergeState =
-                               getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor);
-
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               MergingWindowSet<W> mergingWindows = new 
MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState);
-
-               return mergingWindows;
+               @SuppressWarnings("unchecked")
+               MergingWindowAssigner<? super IN, W> mergingAssigner = 
(MergingWindowAssigner<? super IN, W>) windowAssigner;
+               return new MergingWindowSet<>(mergingAssigner, 
mergingSetsState);
        }
 
        /**
@@ -655,11 +676,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                public <S extends MergingState<?, ?>> void 
mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
                        if (mergedWindows != null && mergedWindows.size() > 0) {
                                try {
-                                       
WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(window,
-                                                       mergedWindows,
-                                                       windowSerializer,
-                                                       stateDescriptor);
-                               } catch (Exception e) {
+                                       S rawState = 
getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor);
+
+                                       if (rawState instanceof 
InternalMergingState) {
+                                               @SuppressWarnings("unchecked")
+                                               InternalMergingState<W, ?, ?> 
mergingState = (InternalMergingState<W, ?, ?>) rawState;
+                                               
mergingState.mergeNamespaces(window, mergedWindows);
+                                       }
+                                       else {
+                                               throw new 
IllegalArgumentException(
+                                                               "The given 
state descriptor does not refer to a mergeable state (MergingState)");
+                                       }
+                               }
+                               catch (Exception e) {
                                        throw new RuntimeException("Error while 
merging state.", e);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 0e2d1e8..2faa506 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -83,6 +84,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
 public class WindowOperatorTest extends TestLogger {
 
        // For counting if close() is called the correct number of times on the 
SumReducer
@@ -758,7 +760,7 @@ public class WindowOperatorTest extends TestLogger {
                                0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO); ;
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -846,7 +848,7 @@ public class WindowOperatorTest extends TestLogger {
                                0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO); ;
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -1124,7 +1126,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
                
                testHarness.open();
                
@@ -1184,7 +1186,7 @@ public class WindowOperatorTest extends TestLogger {
                                        LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
                
                testHarness.open();
 
@@ -1250,7 +1252,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -1310,7 +1312,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
                
@@ -1385,7 +1387,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
                
@@ -1475,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
                
@@ -1559,7 +1561,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -1643,7 +1645,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
                
@@ -1736,7 +1738,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
                
@@ -1821,7 +1823,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -1902,11 +1904,11 @@ public class WindowOperatorTest extends TestLogger {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction2()),
-                               new EventTimeTriggerAccumGC(LATENESS),
+                                       new EventTimeTriggerAccumGC(LATENESS),
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -1929,7 +1931,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.close();
        }
 
-       private class PassThroughFunction2 implements 
WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
+       private static class PassThroughFunction2 implements 
WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -1960,7 +1962,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2006,7 +2008,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2063,7 +2065,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2108,7 +2110,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2152,7 +2154,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2172,6 +2174,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.close();
        }
 
+       // TODO this test seems invalid, as it uses the unsupported combination 
of merging windows and folding window state
        @Test
        public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() 
throws Exception {
                final int GAP_SIZE = 3;
@@ -2206,7 +2209,7 @@ public class WindowOperatorTest extends TestLogger {
                                LATENESS);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.open();
 
@@ -2230,7 +2233,7 @@ public class WindowOperatorTest extends TestLogger {
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       private class PassThroughFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow> {
+       private static class PassThroughFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -2289,7 +2292,7 @@ public class WindowOperatorTest extends TestLogger {
        }
 
        @SuppressWarnings("unchecked")
-       private static class Tuple2ResultSortComparator implements 
Comparator<Object> {
+       private static class Tuple2ResultSortComparator implements 
Comparator<Object>, Serializable {
                @Override
                public int compare(Object o1, Object o2) {
                        if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
@@ -2311,7 +2314,7 @@ public class WindowOperatorTest extends TestLogger {
        }
 
        @SuppressWarnings("unchecked")
-       private static class Tuple3ResultSortComparator implements 
Comparator<Object> {
+       private static class Tuple3ResultSortComparator implements 
Comparator<Object>, Serializable {
                @Override
                public int compare(Object o1, Object o2) {
                        if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
@@ -2403,15 +2406,11 @@ public class WindowOperatorTest extends TestLogger {
         * purge the state of the fired window. This is to test the state
         * garbage collection mechanism.
         */
-       public class EventTimeTriggerAccumGC extends Trigger<Object, 
TimeWindow> {
+       public static class EventTimeTriggerAccumGC extends Trigger<Object, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                private long cleanupTime;
 
-               private EventTimeTriggerAccumGC() {
-                       cleanupTime = 0L;
-               }
-
                public EventTimeTriggerAccumGC(long cleanupTime) {
                        this.cleanupTime = cleanupTime;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 0e1aca0..0562443 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,9 +18,7 @@
 
 package org.apache.flink.test.query;
 
-
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +28,14 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
@@ -59,14 +61,18 @@ public final class KVStateRequestSerializerRocksDBTest {
         */
        final static class RocksDBKeyedStateBackend2<K> extends 
RocksDBKeyedStateBackend<K> {
 
-               RocksDBKeyedStateBackend2(final JobID jobId,
-                       final String operatorIdentifier,
-                       final ClassLoader userCodeClassLoader,
-                       final File instanceBasePath, final DBOptions dbOptions,
-                       final ColumnFamilyOptions columnFamilyOptions,
-                       final TaskKvStateRegistry kvStateRegistry,
-                       final TypeSerializer<K> keySerializer, final int 
numberOfKeyGroups,
-                       final KeyGroupRange keyGroupRange) throws Exception {
+               RocksDBKeyedStateBackend2(
+                               final JobID jobId,
+                               final String operatorIdentifier,
+                               final ClassLoader userCodeClassLoader,
+                               final File instanceBasePath,
+                               final DBOptions dbOptions,
+                               final ColumnFamilyOptions columnFamilyOptions,
+                               final TaskKvStateRegistry kvStateRegistry,
+                               final TypeSerializer<K> keySerializer,
+                               final int numberOfKeyGroups,
+                               final KeyGroupRange keyGroupRange) throws 
Exception {
+
                        super(jobId, operatorIdentifier, userCodeClassLoader,
                                instanceBasePath,
                                dbOptions, columnFamilyOptions, 
kvStateRegistry, keySerializer,
@@ -74,9 +80,10 @@ public final class KVStateRequestSerializerRocksDBTest {
                }
 
                @Override
-               public <N, T> ListState<T> createListState(
+               public <N, T> InternalListState<N, T> createListState(
                        final TypeSerializer<N> namespaceSerializer,
                        final ListStateDescriptor<T> stateDesc) throws 
Exception {
+
                        return super.createListState(namespaceSerializer, 
stateDesc);
                }
        }
@@ -90,8 +97,7 @@ public final class KVStateRequestSerializerRocksDBTest {
         */
        @Test
        public void testListSerialization() throws Exception {
-               final long key = 0l;
-               TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+               final long key = 0L;
 
                // objects for RocksDB state list serialisation
                DBOptions dbOptions = 
PredefinedOptions.DEFAULT.createDBOptions();
@@ -110,9 +116,10 @@ public final class KVStateRequestSerializerRocksDBTest {
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
-               final ListState<Long> listState = longHeapKeyedStateBackend
+               final InternalListState<VoidNamespace, Long> listState = 
longHeapKeyedStateBackend
                        .createListState(VoidNamespaceSerializer.INSTANCE,
                                new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
+
                KvStateRequestSerializerTest.testListSerialization(key, 
listState);
        }
 }

Reply via email to