[FLINK-4924] Simplify Operator Test Harness Constructors

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe1654c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe1654c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe1654c6

Branch: refs/heads/master
Commit: fe1654c680cad692e19ce262c402fd9756e8602a
Parents: de03e0c
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Oct 26 12:19:25 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |  9 ++---
 .../fs/bucketing/BucketingSinkTest.java         |  2 +-
 .../flink/streaming/api/graph/StreamConfig.java | 28 ---------------
 .../api/graph/StreamingJobGraphGenerator.java   |  5 ---
 .../api/operators/AbstractStreamOperator.java   |  2 +-
 .../StreamOperatorSnapshotRestoreTest.java      | 38 ++++++++++++--------
 ...stampsAndPeriodicWatermarksOperatorTest.java | 14 ++++----
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++---
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++---
 .../operators/windowing/WindowOperatorTest.java | 10 ++----
 .../windowing/WindowingTestHarnessTest.java     |  4 ---
 .../tasks/OneInputStreamTaskTestHarness.java    |  5 +--
 .../runtime/tasks/StreamMockEnvironment.java    |  7 +++-
 .../util/AbstractStreamOperatorTestHarness.java | 16 ++++-----
 .../KeyedOneInputStreamOperatorTestHarness.java | 23 +++++-------
 .../KeyedTwoInputStreamOperatorTestHarness.java | 27 +++++---------
 .../util/OneInputStreamOperatorTestHarness.java |  7 ++--
 .../util/TwoInputStreamOperatorTestHarness.java |  9 ++---
 .../streaming/util/WindowingTestHarness.java    | 11 +++---
 19 files changed, 95 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 56d8efc..198a621 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -120,14 +120,15 @@ public class ContinuousFileMonitoringTest {
                TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
 
                final long watermarkInterval = 10;
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               executionConfig.setAutoWatermarkInterval(watermarkInterval);
 
                ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
-               reader.setOutputType(typeInfo, executionConfig);
+               reader.setOutputType(typeInfo, new ExecutionConfig());
 
                final OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
-                       new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig);
+                       new OneInputStreamOperatorTestHarness<>(reader);
+
+               
tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
+
 
                tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index f4b3cd7..e4b0460 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -90,7 +90,7 @@ public class BucketingSinkTest {
 
        private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
                        BucketingSink<T> sink) throws Exception {
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), new ExecutionConfig());
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
        }
 
        @BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index ffe8456..2d38fb9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -76,8 +76,6 @@ public class StreamConfig implements Serializable {
        private static final String STATE_BACKEND = "statebackend";
        private static final String STATE_PARTITIONER = "statePartitioner";
 
-       private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups";
-
        private static final String STATE_KEY_SERIALIZER = "statekeyser";
        
        private static final String TIME_CHARACTERISTIC = "timechar";
@@ -450,32 +448,6 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       /**
-        * Sets the number of key-groups to be used for the current {@link 
StreamOperator}.
-        *
-        * @param numberOfKeyGroups Number of key-groups to be used
-        */
-       public void setNumberOfKeyGroups(int numberOfKeyGroups) {
-               try {
-                       
InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, 
NUMBER_OF_KEY_GROUPS);
-               } catch (Exception e) {
-                       throw new StreamTaskException("Could not serialize 
virtual state partitioner.", e);
-               }
-       }
-
-       /**
-        * Gets the number of key-groups for the {@link StreamOperator}.
-        *
-        * @return the number of key-groups
-        */
-       public Integer getNumberOfKeyGroups(ClassLoader cl) {
-               try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl);
-               } catch (Exception e) {
-                       throw new StreamTaskException("Could not instantiate 
virtual state partitioner.", e);
-               }
-       }
-       
        public void setStateKeySerializer(TypeSerializer<?> serializer) {
                try {
                        InstantiationUtil.writeObjectToConfig(serializer, 
this.config, STATE_KEY_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1d99cf3..87fd7eb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -355,11 +355,6 @@ public class StreamingJobGraphGenerator {
                config.setStatePartitioner(1, vertex.getStatePartitioner2());
                config.setStateKeySerializer(vertex.getStateKeySerializer());
 
-               // only set the max parallelism if the vertex uses partitioned 
state (= KeyedStream).
-               if (vertex.getStatePartitioner1() != null) {
-                       config.setNumberOfKeyGroups(vertex.getMaxParallelism());
-               }
-
                Class<? extends AbstractInvokable> vertexClass = 
vertex.getJobVertexClass();
 
                if (vertexClass.equals(StreamIterationHead.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5b66466..aa2f584 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT>
 
                                this.keyedStateBackend = 
container.createKeyedStateBackend(
                                                keySerializer,
-                                               
container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
+                                               
container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
                                                subTaskKeyGroupRange);
 
                                this.keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index ada0b86..cc29172 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 
 public class StreamOperatorSnapshotRestoreTest {
 
+       private static final int MAX_PARALLELISM = 10;
+
        @Test
        public void testOperatorStatesSnapshotRestore() throws Exception {
 
@@ -57,12 +59,16 @@ public class StreamOperatorSnapshotRestoreTest {
                TestOneInputStreamOperator op = new 
TestOneInputStreamOperator(false);
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(op, new KeySelector<Integer, 
Integer>() {
-                                       @Override
-                                       public Integer getKey(Integer value) 
throws Exception {
-                                               return value;
-                                       }
-                               }, TypeInformation.of(Integer.class));
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               op,
+                                               new KeySelector<Integer, 
Integer>() {
+                                                       @Override
+                                                       public Integer 
getKey(Integer value) throws Exception {
+                                                               return value;
+                                                       }
+                                               },
+                                               
TypeInformation.of(Integer.class),
+                                               MAX_PARALLELISM);
 
                testHarness.open();
 
@@ -87,12 +93,16 @@ public class StreamOperatorSnapshotRestoreTest {
                
//-------------------------------------------------------------------------- 
restore
 
                op = new TestOneInputStreamOperator(true);
-               testHarness = new KeyedOneInputStreamOperatorTestHarness<>(op, 
new KeySelector<Integer, Integer>() {
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value;
-                       }
-               }, TypeInformation.of(Integer.class));
+               testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
+                               op,
+                               new KeySelector<Integer, Integer>() {
+                                       @Override
+                                       public Integer getKey(Integer value) 
throws Exception {
+                                               return value;
+                                       }
+                               },
+                               TypeInformation.of(Integer.class),
+                               MAX_PARALLELISM);
 
                testHarness.initializeState(new OperatorStateHandles(
                                0,
@@ -159,7 +169,7 @@ public class StreamOperatorSnapshotRestoreTest {
                                ++count;
                        }
 
-                       
Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, 
count);
+                       Assert.assertEquals(MAX_PARALLELISM, count);
 
                        // write raw operator state that goes into snapshot
                        OperatorStateCheckpointOutputStream outOp = 
context.getRawOperatorStateOutput();
@@ -188,7 +198,7 @@ public class StreamOperatorSnapshotRestoreTest {
                                                ++count;
                                        }
                                }
-                               
Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, 
count);
+                               Assert.assertEquals(MAX_PARALLELISM, count);
 
                                // check restored managed operator state
                                BitSet check = new BitSet(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index febfcde..f84836b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -40,11 +40,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                final TimestampsAndPeriodicWatermarksOperator<Long> operator = 
                                new 
TimestampsAndPeriodicWatermarksOperator<Long>(new LongExtractor());
 
-               final ExecutionConfig config = new ExecutionConfig();
-               config.setAutoWatermarkInterval(50);
-
                OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config);
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
 
                long currentTime = 0;
 
@@ -126,11 +125,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                final TimestampsAndPeriodicWatermarksOperator<Long> operator =
                                new 
TimestampsAndPeriodicWatermarksOperator<Long>(assigner);
 
-               final ExecutionConfig config = new ExecutionConfig();
-               config.setAutoWatermarkInterval(50);
-
                OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config);
+                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator);
+
+               testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
 
                testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 720258e..34f69f8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -431,7 +431,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, windowSize);
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                                       new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setup();
                        testHarness.open();
@@ -467,7 +467,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -514,7 +514,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, 
windowSlide);
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                                       new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setProcessingTime(0);
 
@@ -551,7 +551,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                        windowSize, windowSlide);
 
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -604,7 +604,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), 
identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
                        testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ca5753..1875bbb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -496,7 +496,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, windowSize);
 
                        OneInputStreamOperatorTestHarness<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                                       new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setProcessingTime(0);
 
@@ -536,7 +536,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSize);
 
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -587,7 +587,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 
                        OneInputStreamOperatorTestHarness<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                                       new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setProcessingTime(0);
 
@@ -627,7 +627,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSlide);
 
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -683,7 +683,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                        KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(
                                        op,
-                                       new ExecutionConfig(),
                                        fieldOneSelector,
                                        BasicTypeInfo.INT_TYPE_INFO);
 
@@ -734,7 +733,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                        KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(
                                        op,
-                                       new ExecutionConfig(),
                                        fieldOneSelector,
                                        BasicTypeInfo.INT_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9e50aaa..e5a5e21 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -932,7 +932,7 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -989,7 +989,7 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -1059,7 +1059,7 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -2471,7 +2471,6 @@ public class WindowOperatorTest extends TestLogger {
                TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,
@@ -2520,7 +2519,6 @@ public class WindowOperatorTest extends TestLogger {
                SlidingEventTimeWindows windowAssigner = 
SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,
@@ -2552,7 +2550,6 @@ public class WindowOperatorTest extends TestLogger {
                        Time.milliseconds(OFFSET));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,
@@ -2607,7 +2604,6 @@ public class WindowOperatorTest extends TestLogger {
                        Time.milliseconds(SLIDING),Time.milliseconds(OFFSET));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
index 58a7897..8e33c92 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -45,7 +44,6 @@ public class WindowingTestHarnessTest {
                TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,
@@ -92,7 +90,6 @@ public class WindowingTestHarnessTest {
                TumblingProcessingTimeWindows windowAssigner = 
TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,
@@ -145,7 +142,6 @@ public class WindowingTestHarnessTest {
                TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
                WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       new ExecutionConfig(),
                        windowAssigner,
                        BasicTypeInfo.STRING_TYPE_INFO,
                        inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 5573a53..3cf055e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -101,11 +101,12 @@ public class OneInputStreamTaskTestHarness<IN, OUT> 
extends StreamTaskTestHarnes
                streamConfig.setTypeSerializerIn1(inputSerializer);
        }
 
-       public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, 
TypeInformation<K> keyType) {
+       public <K> void configureForKeyedStream(
+                       KeySelector<IN, K> keySelector,
+                       TypeInformation<K> keyType) {
                ClosureCleaner.clean(keySelector, false);
                streamConfig.setStatePartitioner(0, keySelector);
                
streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
-               streamConfig.setNumberOfKeyGroups(10);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 36ecf59..2376a60 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -103,7 +103,12 @@ public class StreamMockEnvironment implements Environment {
 
        public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, ExecutionConfig executionConfig,
                                                                        long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
-               this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
+               this.taskInfo = new TaskInfo(
+                               "", /* task name */
+                               1, /* num key groups / max parallelism */
+                               0, /* index of this subtask */
+                               1, /* num subtasks */
+                               0 /* attempt number */);
                this.jobConfiguration = jobConfig;
                this.taskConfiguration = taskConfig;
                this.inputs = new LinkedList<InputGate>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index a61d995..1124fa9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -63,7 +63,7 @@ import static org.mockito.Mockito.*;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-       public static final int MAX_PARALLELISM = 10;
+       protected final static int DEFAULT_MAX_PARALLELISM = 1;
 
        final protected StreamOperator<OUT> operator;
 
@@ -92,19 +92,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
        private volatile boolean wasFailedExternally = false;
 
-       public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) 
throws Exception {
-               this(operator, new ExecutionConfig());
-       }
-
        public AbstractStreamOperatorTestHarness(
                        StreamOperator<OUT> operator,
-                       ExecutionConfig executionConfig) throws Exception {
+                       int maxParallelism) throws Exception {
                this.operator = operator;
                this.outputList = new ConcurrentLinkedQueue<>();
                Configuration underlyingConfig = new Configuration();
                this.config = new StreamConfig(underlyingConfig);
                this.config.setCheckpointingEnabled(true);
-               this.executionConfig = executionConfig;
+               this.executionConfig = new ExecutionConfig();
                this.closableRegistry = new ClosableRegistry();
                this.checkpointLock = new Object();
 
@@ -115,7 +111,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                                1024,
                                underlyingConfig,
                                executionConfig,
-                               MAX_PARALLELISM,
+                               maxParallelism,
                                1, 0);
 
                mockTask = mock(StreamTask.class);
@@ -192,6 +188,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                return this.mockTask.getEnvironment();
        }
 
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
        /**
         * Get all the output from the task. This contains StreamRecords and 
Events interleaved.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 99527e7..25563a3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -65,29 +64,23 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
        public KeyedOneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        final KeySelector<IN, K> keySelector,
-                       TypeInformation<K> keyType) throws Exception {
-               super(operator);
+                       TypeInformation<K> keyType,
+                       int maxParallelism) throws Exception {
+               super(operator, maxParallelism);
 
                ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-               config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
                setupMockTaskCreateKeyedBackend();
        }
 
-       public 
KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
-                       ExecutionConfig executionConfig,
-                       KeySelector<IN, K> keySelector,
-                       TypeInformation<K> keyType) throws Exception {
-               super(operator, executionConfig);
-
-               ClosureCleaner.clean(keySelector, false);
-               config.setStatePartitioner(0, keySelector);
-               
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-               config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
-               setupMockTaskCreateKeyedBackend();
+       public KeyedOneInputStreamOperatorTestHarness(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       final KeySelector<IN, K> keySelector,
+                       TypeInformation<K> keyType) throws Exception {
+               this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM);
        }
 
        private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 2e9885c..1a01ea3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -55,37 +54,27 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
 
        public KeyedTwoInputStreamOperatorTestHarness(
                        TwoInputStreamOperator<IN1, IN2, OUT> operator,
-                       final KeySelector<IN1, K> keySelector1,
-                       final KeySelector<IN2, K> keySelector2,
-                       TypeInformation<K> keyType) throws Exception {
-               super(operator);
+                       KeySelector<IN1, K> keySelector1,
+                       KeySelector<IN2, K> keySelector2,
+                       TypeInformation<K> keyType,
+                       int maxParallelism) throws Exception {
+               super(operator, maxParallelism);
 
                ClosureCleaner.clean(keySelector1, false);
                ClosureCleaner.clean(keySelector2, false);
                config.setStatePartitioner(0, keySelector1);
                config.setStatePartitioner(1, keySelector2);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-               config.setNumberOfKeyGroups(MAX_PARALLELISM);
 
                setupMockTaskCreateKeyedBackend();
        }
 
        public KeyedTwoInputStreamOperatorTestHarness(
                        TwoInputStreamOperator<IN1, IN2, OUT> operator,
-                       ExecutionConfig executionConfig,
-                       KeySelector<IN1, K> keySelector1,
-                       KeySelector<IN2, K> keySelector2,
+                       final KeySelector<IN1, K> keySelector1,
+                       final KeySelector<IN2, K> keySelector2,
                        TypeInformation<K> keyType) throws Exception {
-               super(operator, executionConfig);
-
-               ClosureCleaner.clean(keySelector1, false);
-               ClosureCleaner.clean(keySelector2, false);
-               config.setStatePartitioner(0, keySelector1);
-               config.setStatePartitioner(1, keySelector2);
-               
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-               config.setNumberOfKeyGroups(MAX_PARALLELISM);
-
-               setupMockTaskCreateKeyedBackend();
+               this(operator, keySelector1, keySelector2, keyType, 
DEFAULT_MAX_PARALLELISM);
        }
 
        private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a3e095a..8be9c63 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,13 +36,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
        private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) throws Exception {
-               this(operator, new ExecutionConfig());
+               this(operator, DEFAULT_MAX_PARALLELISM);
        }
 
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
-                       ExecutionConfig executionConfig) throws Exception {
-               super(operator, executionConfig);
+                       int maxParallelism) throws Exception {
+               super(operator, maxParallelism);
 
                this.oneInputOperator = operator;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 95eea98..c6f6918 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,11 +35,13 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, 
OUT>extends AbstractStr
        private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
        public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator) throws Exception {
-               this(operator, new ExecutionConfig());
+               this(operator, DEFAULT_MAX_PARALLELISM);
        }
                
-       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception {
-               super(operator, executionConfig);
+       public TwoInputStreamOperatorTestHarness(
+                       TwoInputStreamOperator<IN1, IN2, OUT> operator,
+                       int maxParallelism) throws Exception {
+               super(operator, maxParallelism);
 
                this.twoInputOperator = operator;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 9a1b512..db3a89c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -55,8 +55,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 
        private volatile boolean isOpen = false;
 
-       public WindowingTestHarness(ExecutionConfig executionConfig,
-                                                               
WindowAssigner<? super IN, W> windowAssigner,
+       public WindowingTestHarness(WindowAssigner<? super IN, W> 
windowAssigner,
                                                                
TypeInformation<K> keyType,
                                                                
TypeInformation<IN> inputType,
                                                                KeySelector<IN, 
K> keySelector,
@@ -64,20 +63,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
                                                                long 
allowedLateness) throws Exception {
 
                ListStateDescriptor<IN> windowStateDesc =
-                               new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(executionConfig));
+                               new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
 
                WindowOperator<K, IN, Iterable<IN>, IN, W> operator =
                        new WindowOperator<>(
                                windowAssigner,
-                               
windowAssigner.getWindowSerializer(executionConfig),
+                               windowAssigner.getWindowSerializer(new 
ExecutionConfig()),
                                keySelector,
-                               keyType.createSerializer(executionConfig),
+                               keyType.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
                                trigger,
                                allowedLateness);
 
-               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, 
keySelector, keyType);
+               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1);
        }
 
        /**

Reply via email to