[FLINK-4924] Simplify Operator Test Harness Constructors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe1654c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe1654c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe1654c6 Branch: refs/heads/master Commit: fe1654c680cad692e19ce262c402fd9756e8602a Parents: de03e0c Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 26 12:19:25 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 26 23:26:28 2016 +0200 ---------------------------------------------------------------------- .../hdfstests/ContinuousFileMonitoringTest.java | 9 ++--- .../fs/bucketing/BucketingSinkTest.java | 2 +- .../flink/streaming/api/graph/StreamConfig.java | 28 --------------- .../api/graph/StreamingJobGraphGenerator.java | 5 --- .../api/operators/AbstractStreamOperator.java | 2 +- .../StreamOperatorSnapshotRestoreTest.java | 38 ++++++++++++-------- ...stampsAndPeriodicWatermarksOperatorTest.java | 14 ++++---- ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++--- ...AlignedProcessingTimeWindowOperatorTest.java | 10 +++--- .../operators/windowing/WindowOperatorTest.java | 10 ++---- .../windowing/WindowingTestHarnessTest.java | 4 --- .../tasks/OneInputStreamTaskTestHarness.java | 5 +-- .../runtime/tasks/StreamMockEnvironment.java | 7 +++- .../util/AbstractStreamOperatorTestHarness.java | 16 ++++----- .../KeyedOneInputStreamOperatorTestHarness.java | 23 +++++------- .../KeyedTwoInputStreamOperatorTestHarness.java | 27 +++++--------- .../util/OneInputStreamOperatorTestHarness.java | 7 ++-- .../util/TwoInputStreamOperatorTestHarness.java | 9 ++--- .../streaming/util/WindowingTestHarness.java | 11 +++--- 19 files changed, 95 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index 56d8efc..198a621 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -120,14 +120,15 @@ public class ContinuousFileMonitoringTest { TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); final long watermarkInterval = 10; - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setAutoWatermarkInterval(watermarkInterval); ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); - reader.setOutputType(typeInfo, executionConfig); + reader.setOutputType(typeInfo, new ExecutionConfig()); final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = - new OneInputStreamOperatorTestHarness<>(reader, executionConfig); + new OneInputStreamOperatorTestHarness<>(reader); + + tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index f4b3cd7..e4b0460 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -90,7 +90,7 @@ public class BucketingSinkTest { private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( BucketingSink<T> sink) throws Exception { - return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig()); + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); } @BeforeClass http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index ffe8456..2d38fb9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -76,8 +76,6 @@ public class StreamConfig implements Serializable { private static final String STATE_BACKEND = "statebackend"; private static final String STATE_PARTITIONER = "statePartitioner"; - private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups"; - private static final String STATE_KEY_SERIALIZER = "statekeyser"; private static final String TIME_CHARACTERISTIC = "timechar"; @@ -450,32 +448,6 @@ public class StreamConfig implements Serializable { } } - /** - * Sets the number of key-groups to be used for the current {@link StreamOperator}. - * - * @param numberOfKeyGroups Number of key-groups to be used - */ - public void setNumberOfKeyGroups(int numberOfKeyGroups) { - try { - InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, NUMBER_OF_KEY_GROUPS); - } catch (Exception e) { - throw new StreamTaskException("Could not serialize virtual state partitioner.", e); - } - } - - /** - * Gets the number of key-groups for the {@link StreamOperator}. - * - * @return the number of key-groups - */ - public Integer getNumberOfKeyGroups(ClassLoader cl) { - try { - return InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl); - } catch (Exception e) { - throw new StreamTaskException("Could not instantiate virtual state partitioner.", e); - } - } - public void setStateKeySerializer(TypeSerializer<?> serializer) { try { InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 1d99cf3..87fd7eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -355,11 +355,6 @@ public class StreamingJobGraphGenerator { config.setStatePartitioner(1, vertex.getStatePartitioner2()); config.setStateKeySerializer(vertex.getStateKeySerializer()); - // only set the max parallelism if the vertex uses partitioned state (= KeyedStream). - if (vertex.getStatePartitioner1() != null) { - config.setNumberOfKeyGroups(vertex.getMaxParallelism()); - } - Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass(); if (vertexClass.equals(StreamIterationHead.class) http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 5b66466..aa2f584 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT> this.keyedStateBackend = container.createKeyedStateBackend( keySerializer, - container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()), + container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(), subTaskKeyGroupRange); this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index ada0b86..cc29172 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -49,6 +49,8 @@ import java.util.Collections; public class StreamOperatorSnapshotRestoreTest { + private static final int MAX_PARALLELISM = 10; + @Test public void testOperatorStatesSnapshotRestore() throws Exception { @@ -57,12 +59,16 @@ public class StreamOperatorSnapshotRestoreTest { TestOneInputStreamOperator op = new TestOneInputStreamOperator(false); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(op, new KeySelector<Integer, Integer>() { - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, TypeInformation.of(Integer.class)); + new KeyedOneInputStreamOperatorTestHarness<>( + op, + new KeySelector<Integer, Integer>() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, + TypeInformation.of(Integer.class), + MAX_PARALLELISM); testHarness.open(); @@ -87,12 +93,16 @@ public class StreamOperatorSnapshotRestoreTest { //-------------------------------------------------------------------------- restore op = new TestOneInputStreamOperator(true); - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(op, new KeySelector<Integer, Integer>() { - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, TypeInformation.of(Integer.class)); + testHarness = new KeyedOneInputStreamOperatorTestHarness<>( + op, + new KeySelector<Integer, Integer>() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, + TypeInformation.of(Integer.class), + MAX_PARALLELISM); testHarness.initializeState(new OperatorStateHandles( 0, @@ -159,7 +169,7 @@ public class StreamOperatorSnapshotRestoreTest { ++count; } - Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, count); + Assert.assertEquals(MAX_PARALLELISM, count); // write raw operator state that goes into snapshot OperatorStateCheckpointOutputStream outOp = context.getRawOperatorStateOutput(); @@ -188,7 +198,7 @@ public class StreamOperatorSnapshotRestoreTest { ++count; } } - Assert.assertEquals(KeyedOneInputStreamOperatorTestHarness.MAX_PARALLELISM, count); + Assert.assertEquals(MAX_PARALLELISM, count); // check restored managed operator state BitSet check = new BitSet(10); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java index febfcde..f84836b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java @@ -40,11 +40,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { final TimestampsAndPeriodicWatermarksOperator<Long> operator = new TimestampsAndPeriodicWatermarksOperator<Long>(new LongExtractor()); - final ExecutionConfig config = new ExecutionConfig(); - config.setAutoWatermarkInterval(50); - OneInputStreamOperatorTestHarness<Long, Long> testHarness = - new OneInputStreamOperatorTestHarness<Long, Long>(operator, config); + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.getExecutionConfig().setAutoWatermarkInterval(50); long currentTime = 0; @@ -126,11 +125,10 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { final TimestampsAndPeriodicWatermarksOperator<Long> operator = new TimestampsAndPeriodicWatermarksOperator<Long>(assigner); - final ExecutionConfig config = new ExecutionConfig(); - config.setAutoWatermarkInterval(50); - OneInputStreamOperatorTestHarness<Long, Long> testHarness = - new OneInputStreamOperatorTestHarness<Long, Long>(operator, config); + new OneInputStreamOperatorTestHarness<Long, Long>(operator); + + testHarness.getExecutionConfig().setAutoWatermarkInterval(50); testHarness.open(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 720258e..34f69f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -431,7 +431,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSize); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + new OneInputStreamOperatorTestHarness<>(op); testHarness.setup(); testHarness.open(); @@ -467,7 +467,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + testHarness = new OneInputStreamOperatorTestHarness<>(op); testHarness.setup(); testHarness.restore(state); @@ -514,7 +514,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSlide); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + new OneInputStreamOperatorTestHarness<>(op); testHarness.setProcessingTime(0); @@ -551,7 +551,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSlide); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + testHarness = new OneInputStreamOperatorTestHarness<>(op); testHarness.setup(); testHarness.restore(state); @@ -604,7 +604,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), identitySelector, BasicTypeInfo.INT_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); testHarness.open(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 7ca5753..1875bbb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -496,7 +496,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSize); OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + new OneInputStreamOperatorTestHarness<>(op); testHarness.setProcessingTime(0); @@ -536,7 +536,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + testHarness = new OneInputStreamOperatorTestHarness<>(op); testHarness.setup(); testHarness.restore(state); @@ -587,7 +587,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + new OneInputStreamOperatorTestHarness<>(op); testHarness.setProcessingTime(0); @@ -627,7 +627,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + testHarness = new OneInputStreamOperatorTestHarness<>(op); testHarness.setup(); testHarness.restore(state); @@ -683,7 +683,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( op, - new ExecutionConfig(), fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); @@ -734,7 +733,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( op, - new ExecutionConfig(), fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 9e50aaa..e5a5e21 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -932,7 +932,7 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -989,7 +989,7 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -1059,7 +1059,7 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -2471,7 +2471,6 @@ public class WindowOperatorTest extends TestLogger { TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, @@ -2520,7 +2519,6 @@ public class WindowOperatorTest extends TestLogger { SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, @@ -2552,7 +2550,6 @@ public class WindowOperatorTest extends TestLogger { Time.milliseconds(OFFSET)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, @@ -2607,7 +2604,6 @@ public class WindowOperatorTest extends TestLogger { Time.milliseconds(SLIDING),Time.milliseconds(OFFSET)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java index 58a7897..8e33c92 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; @@ -45,7 +44,6 @@ public class WindowingTestHarnessTest { TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, @@ -92,7 +90,6 @@ public class WindowingTestHarnessTest { TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, @@ -145,7 +142,6 @@ public class WindowingTestHarnessTest { TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), windowAssigner, BasicTypeInfo.STRING_TYPE_INFO, inputType, http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 5573a53..3cf055e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -101,11 +101,12 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes streamConfig.setTypeSerializerIn1(inputSerializer); } - public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { + public <K> void configureForKeyedStream( + KeySelector<IN, K> keySelector, + TypeInformation<K> keyType) { ClosureCleaner.clean(keySelector, false); streamConfig.setStatePartitioner(0, keySelector); streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig)); - streamConfig.setNumberOfKeyGroups(10); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 36ecf59..2376a60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -103,7 +103,12 @@ public class StreamMockEnvironment implements Environment { public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { - this.taskInfo = new TaskInfo("", 1, 0, 1, 0); + this.taskInfo = new TaskInfo( + "", /* task name */ + 1, /* num key groups / max parallelism */ + 0, /* index of this subtask */ + 1, /* num subtasks */ + 0 /* attempt number */); this.jobConfiguration = jobConfig; this.taskConfiguration = taskConfig; this.inputs = new LinkedList<InputGate>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index a61d995..1124fa9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -63,7 +63,7 @@ import static org.mockito.Mockito.*; */ public class AbstractStreamOperatorTestHarness<OUT> { - public static final int MAX_PARALLELISM = 10; + protected final static int DEFAULT_MAX_PARALLELISM = 1; final protected StreamOperator<OUT> operator; @@ -92,19 +92,15 @@ public class AbstractStreamOperatorTestHarness<OUT> { private volatile boolean wasFailedExternally = false; - public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) throws Exception { - this(operator, new ExecutionConfig()); - } - public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, - ExecutionConfig executionConfig) throws Exception { + int maxParallelism) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); Configuration underlyingConfig = new Configuration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); - this.executionConfig = executionConfig; + this.executionConfig = new ExecutionConfig(); this.closableRegistry = new ClosableRegistry(); this.checkpointLock = new Object(); @@ -115,7 +111,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { 1024, underlyingConfig, executionConfig, - MAX_PARALLELISM, + maxParallelism, 1, 0); mockTask = mock(StreamTask.class); @@ -192,6 +188,10 @@ public class AbstractStreamOperatorTestHarness<OUT> { return this.mockTask.getEnvironment(); } + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + /** * Get all the output from the task. This contains StreamRecords and Events interleaved. */ http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 99527e7..25563a3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -65,29 +64,23 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> public KeyedOneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, final KeySelector<IN, K> keySelector, - TypeInformation<K> keyType) throws Exception { - super(operator); + TypeInformation<K> keyType, + int maxParallelism) throws Exception { + super(operator, maxParallelism); ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); - config.setNumberOfKeyGroups(MAX_PARALLELISM); setupMockTaskCreateKeyedBackend(); } - public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, - ExecutionConfig executionConfig, - KeySelector<IN, K> keySelector, - TypeInformation<K> keyType) throws Exception { - super(operator, executionConfig); - - ClosureCleaner.clean(keySelector, false); - config.setStatePartitioner(0, keySelector); - config.setStateKeySerializer(keyType.createSerializer(executionConfig)); - config.setNumberOfKeyGroups(MAX_PARALLELISM); - setupMockTaskCreateKeyedBackend(); + public KeyedOneInputStreamOperatorTestHarness( + OneInputStreamOperator<IN, OUT> operator, + final KeySelector<IN, K> keySelector, + TypeInformation<K> keyType) throws Exception { + this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM); } private void setupMockTaskCreateKeyedBackend() { http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 2e9885c..1a01ea3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -55,37 +54,27 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> public KeyedTwoInputStreamOperatorTestHarness( TwoInputStreamOperator<IN1, IN2, OUT> operator, - final KeySelector<IN1, K> keySelector1, - final KeySelector<IN2, K> keySelector2, - TypeInformation<K> keyType) throws Exception { - super(operator); + KeySelector<IN1, K> keySelector1, + KeySelector<IN2, K> keySelector2, + TypeInformation<K> keyType, + int maxParallelism) throws Exception { + super(operator, maxParallelism); ClosureCleaner.clean(keySelector1, false); ClosureCleaner.clean(keySelector2, false); config.setStatePartitioner(0, keySelector1); config.setStatePartitioner(1, keySelector2); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); - config.setNumberOfKeyGroups(MAX_PARALLELISM); setupMockTaskCreateKeyedBackend(); } public KeyedTwoInputStreamOperatorTestHarness( TwoInputStreamOperator<IN1, IN2, OUT> operator, - ExecutionConfig executionConfig, - KeySelector<IN1, K> keySelector1, - KeySelector<IN2, K> keySelector2, + final KeySelector<IN1, K> keySelector1, + final KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception { - super(operator, executionConfig); - - ClosureCleaner.clean(keySelector1, false); - ClosureCleaner.clean(keySelector2, false); - config.setStatePartitioner(0, keySelector1); - config.setStatePartitioner(1, keySelector2); - config.setStateKeySerializer(keyType.createSerializer(executionConfig)); - config.setNumberOfKeyGroups(MAX_PARALLELISM); - - setupMockTaskCreateKeyedBackend(); + this(operator, keySelector1, keySelector2, keyType, DEFAULT_MAX_PARALLELISM); } private void setupMockTaskCreateKeyedBackend() { http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index a3e095a..8be9c63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -37,13 +36,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> private final OneInputStreamOperator<IN, OUT> oneInputOperator; public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception { - this(operator, new ExecutionConfig()); + this(operator, DEFAULT_MAX_PARALLELISM); } public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, - ExecutionConfig executionConfig) throws Exception { - super(operator, executionConfig); + int maxParallelism) throws Exception { + super(operator, maxParallelism); this.oneInputOperator = operator; } http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 95eea98..c6f6918 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -36,11 +35,13 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStr private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator; public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception { - this(operator, new ExecutionConfig()); + this(operator, DEFAULT_MAX_PARALLELISM); } - public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception { - super(operator, executionConfig); + public TwoInputStreamOperatorTestHarness( + TwoInputStreamOperator<IN1, IN2, OUT> operator, + int maxParallelism) throws Exception { + super(operator, maxParallelism); this.twoInputOperator = operator; } http://git-wip-us.apache.org/repos/asf/flink/blob/fe1654c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java index 9a1b512..db3a89c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java @@ -55,8 +55,7 @@ public class WindowingTestHarness<K, IN, W extends Window> { private volatile boolean isOpen = false; - public WindowingTestHarness(ExecutionConfig executionConfig, - WindowAssigner<? super IN, W> windowAssigner, + public WindowingTestHarness(WindowAssigner<? super IN, W> windowAssigner, TypeInformation<K> keyType, TypeInformation<IN> inputType, KeySelector<IN, K> keySelector, @@ -64,20 +63,20 @@ public class WindowingTestHarness<K, IN, W extends Window> { long allowedLateness) throws Exception { ListStateDescriptor<IN> windowStateDesc = - new ListStateDescriptor<>("window-contents", inputType.createSerializer(executionConfig)); + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator<K, IN, Iterable<IN>, IN, W> operator = new WindowOperator<>( windowAssigner, - windowAssigner.getWindowSerializer(executionConfig), + windowAssigner.getWindowSerializer(new ExecutionConfig()), keySelector, - keyType.createSerializer(executionConfig), + keyType.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction()), trigger, allowedLateness); - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, keySelector, keyType); + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1); } /**
