Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator"
This reverts commit 9cfae899358e0694c3ecedae1fad20e428a1d359. The fixes around FLINK-5808 introduced follow-up issues. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0182141d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0182141d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0182141d Branch: refs/heads/master Commit: 0182141d41be52ae0cf4a3563d4b8c6f3daca02e Parents: a13750c Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Apr 4 14:04:40 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Apr 18 17:42:10 2017 +0200 ---------------------------------------------------------------------- .../Flip6LocalStreamEnvironment.java | 4 - .../api/environment/LocalStreamEnvironment.java | 26 +- .../environment/RemoteStreamEnvironment.java | 5 - .../environment/StreamContextEnvironment.java | 13 +- .../environment/StreamExecutionEnvironment.java | 65 ++-- .../api/environment/StreamPlanEnvironment.java | 15 +- .../flink/streaming/api/graph/StreamGraph.java | 6 +- .../api/graph/StreamGraphGenerator.java | 12 +- .../api/graph/StreamingJobGraphGenerator.java | 14 +- .../api/StreamExecutionEnvironmentTest.java | 289 +++++++++++++++++ .../StreamExecutionEnvironmentTest.java | 317 ------------------- .../graph/StreamingJobGraphGeneratorTest.java | 10 +- .../FoldApplyProcessWindowFunctionTest.java | 8 +- .../operators/FoldApplyWindowFunctionTest.java | 6 +- .../api/scala/StreamExecutionEnvironment.scala | 28 +- .../streaming/api/scala/DataStreamTest.scala | 11 +- .../streaming/util/TestStreamEnvironment.java | 1 - .../accumulators/AccumulatorLiveITCase.java | 4 - 18 files changed, 397 insertions(+), 437 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index 244660a..61ca55f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -46,9 +46,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class); - /** The default parallelism used when creating a local environment */ - private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - /** The configuration to use for the mini cluster */ private final Configuration conf; @@ -65,7 +62,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { * @param config The configuration used to configure the local executor. */ public Flip6LocalStreamEnvironment(Configuration config) { - super(defaultLocalParallelism); if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " + http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 117f6d8..566beba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -46,9 +46,6 @@ import org.slf4j.LoggerFactory; @Public public class LocalStreamEnvironment extends StreamExecutionEnvironment { - /** The default parallelism used when creating a local environment */ - private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class); /** The configuration to use for the local cluster */ @@ -58,43 +55,24 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { * Creates a new local stream environment that uses the default configuration. */ public LocalStreamEnvironment() { - this(defaultLocalParallelism); + this(null); } /** - * Creates a new local stream environment that uses the default configuration. - */ - public LocalStreamEnvironment(int parallelism) { - this(null, parallelism); - } - - - /** * Creates a new local stream environment that configures its local executor with the given configuration. * * @param config The configuration used to configure the local executor. */ public LocalStreamEnvironment(Configuration config) { - this(config, defaultLocalParallelism); - } - - /** - * Creates a new local stream environment that configures its local executor with the given configuration. - * - * @param config The configuration used to configure the local executor. - */ - public LocalStreamEnvironment(Configuration config, int parallelism) { - super(parallelism); if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The LocalStreamEnvironment cannot be used when submitting a program through a client, " + "or running in a TestEnvironment context."); } - + this.conf = config == null ? new Configuration() : config; } - /** * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user * specified name. http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 5684e28..333f9c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -37,7 +37,6 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,10 +129,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { * The protocol must be supported by the {@link java.net.URLClassLoader}. */ public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) { - super(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The RemoteEnvironment cannot be used when submitting a program through a client, " + http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 51078f2..49c5347 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -38,13 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { private final ContextEnvironment ctx; protected StreamContextEnvironment(ContextEnvironment ctx) { - // if the batch ContextEnvironment has a parallelism this must have come from - // the CLI Client. We should set that as our default parallelism - super(ctx.getParallelism() > 0 ? ctx.getParallelism() : - GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); this.ctx = ctx; + if (ctx.getParallelism() > 0) { + setParallelism(ctx.getParallelism()); + } else { + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b6540dc..e836616 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -112,6 +112,9 @@ public abstract class StreamExecutionEnvironment { /** The environment of the context (local by default, cluster if invoked through command line) */ private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; + /** The default parallelism used when creating a local environment */ + private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); + // ------------------------------------------------------------------------ /** The execution configuration for this environment */ @@ -132,23 +135,11 @@ public abstract class StreamExecutionEnvironment { /** The time characteristic used by the data streams */ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; - /** The parallelism to use when no parallelism is set on an operation. */ - private final int defaultParallelism; - // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- - - public StreamExecutionEnvironment() { - this(ConfigConstants.DEFAULT_PARALLELISM); - } - - public StreamExecutionEnvironment(int defaultParallelism) { - this.defaultParallelism = defaultParallelism; - } - /** * Gets the config object. */ @@ -1527,7 +1518,7 @@ public abstract class StreamExecutionEnvironment { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } - return StreamGraphGenerator.generate(this, transformations, defaultParallelism); + return StreamGraphGenerator.generate(this, transformations); } /** @@ -1615,7 +1606,7 @@ public abstract class StreamExecutionEnvironment { * @return A local execution environment. */ public static LocalStreamEnvironment createLocalEnvironment() { - return new LocalStreamEnvironment(); + return createLocalEnvironment(defaultLocalParallelism); } /** @@ -1624,12 +1615,14 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param defaultParallelism The default parallelism for the local environment. - * + * @param parallelism + * The parallelism for the local environment. * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) { - return new LocalStreamEnvironment(defaultParallelism); + public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { + LocalStreamEnvironment env = new LocalStreamEnvironment(); + env.setParallelism(parallelism); + return env; } /** @@ -1638,13 +1631,16 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param defaultParallelism The parallelism for the local environment. - * @param configuration Pass a custom configuration into the cluster - * + * @param parallelism + * The parallelism for the local environment. + * @param configuration + * Pass a custom configuration into the cluster * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) { - return new LocalStreamEnvironment(configuration, defaultParallelism); + public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { + LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration); + currentEnvironment.setParallelism(parallelism); + return currentEnvironment; } /** @@ -1669,6 +1665,7 @@ public abstract class StreamExecutionEnvironment { conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf); + localEnv.setParallelism(defaultLocalParallelism); return localEnv; } @@ -1754,6 +1751,28 @@ public abstract class StreamExecutionEnvironment { return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles); } + /** + * Gets the default parallelism that will be used for the local execution environment created by + * {@link #createLocalEnvironment()}. + * + * @return The default local parallelism + */ + @PublicEvolving + public static int getDefaultLocalParallelism() { + return defaultLocalParallelism; + } + + /** + * Sets the default parallelism that will be used for the local execution + * environment created by {@link #createLocalEnvironment()}. + * + * @param parallelism The parallelism to use as the default local parallelism. + */ + @PublicEvolving + public static void setDefaultLocalParallelism(int parallelism) { + defaultLocalParallelism = parallelism; + } + // -------------------------------------------------------------------------------------------- // Methods to control the context and local environments for execution from packaged programs // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 9c676c4..b1521f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { private ExecutionEnvironment env; protected StreamPlanEnvironment(ExecutionEnvironment env) { - super(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - + super(); this.env = env; + + int parallelism = env.getParallelism(); + if (parallelism > 0) { + setParallelism(parallelism); + } else { + // determine parallelism + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index a99efb1..cc3ca9e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -94,14 +94,12 @@ public class StreamGraph extends StreamingPlan { private AbstractStateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; - private final int defaultParallelism; - public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) { + public StreamGraph(StreamExecutionEnvironment environment) { this.environment = environment; this.executionConfig = environment.getConfig(); this.checkpointConfig = environment.getCheckpointConfig(); - this.defaultParallelism = defaultParallelism; // create an empty new stream graph. clear(); } @@ -657,7 +655,7 @@ public class StreamGraph extends StreamingPlan { + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } - StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism); + StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this); return jobgraphGenerator.createJobGraph(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 9fc8dd8..f9eec4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -98,11 +98,12 @@ public class StreamGraphGenerator { // we have loops, i.e. feedback edges. private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed; + /** * Private constructor. The generator should only be invoked using {@link #generate}. */ - private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) { - this.streamGraph = new StreamGraph(env, defaultParallelism); + private StreamGraphGenerator(StreamExecutionEnvironment env) { + this.streamGraph = new StreamGraph(env); this.streamGraph.setChaining(env.isChainingEnabled()); this.streamGraph.setStateBackend(env.getStateBackend()); this.env = env; @@ -119,11 +120,8 @@ public class StreamGraphGenerator { * * @return The generated {@code StreamGraph} */ - public static StreamGraph generate( - StreamExecutionEnvironment env, - List<StreamTransformation<?>> transformations, - int defaultParallelism) { - return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations); + public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) { + return new StreamGraphGenerator(env).generateInternal(transformations); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 0896eb7..794de5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -93,13 +92,10 @@ public class StreamingJobGraphGenerator { private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; - private final int defaultParallelism; - - public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) { + public StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); - this.defaultParallelism = defaultParallelism; } private void init() { @@ -342,12 +338,12 @@ public class StreamingJobGraphGenerator { int parallelism = streamNode.getParallelism(); - if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - parallelism = defaultParallelism; + if (parallelism > 0) { + jobVertex.setParallelism(parallelism); + } else { + parallelism = jobVertex.getParallelism(); } - jobVertex.setParallelism(parallelism); - jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java new file mode 100644 index 0000000..3fc1344 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.SplittableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StreamExecutionEnvironmentTest { + + @Test + public void fromElementsWithBaseTypeTest1() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromElementsWithBaseTypeTest2() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test + @SuppressWarnings("unchecked") + public void testFromCollectionParallelism() { + try { + TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); + + try { + dataStream1.setParallelism(4); + fail("should throw an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + dataStream1.addSink(new DiscardingSink<Integer>()); + + DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(), + typeInfo).setParallelism(4); + + dataStream2.addSink(new DiscardingSink<Integer>()); + + env.getExecutionPlan(); + + assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); + assertEquals("Parallelism of parallel collection source must be 4.", + 4, + env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSources() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + DataStreamSource<Integer> src1 = env.addSource(srcFun); + src1.addSink(new DiscardingSink<Integer>()); + assertEquals(srcFun, getFunctionFromDataSource(src1)); + + List<Long> list = Arrays.asList(0L, 1L, 2L); + + DataStreamSource<Long> src2 = env.generateSequence(0, 2); + assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); + + DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L); + assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); + + DataStreamSource<Long> src4 = env.fromCollection(list); + assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); + } + + @Test + public void testParallelismBounds() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + + + SingleOutputStreamOperator<Object> operator = + env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector<Object> out) throws Exception { + + } + }); + + // default value for max parallelism + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // bounds for parallelism 1 + try { + operator.setParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for parallelism 2 + operator.setParallelism(1); + Assert.assertEquals(1, operator.getParallelism()); + + // bounds for parallelism 3 + operator.setParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getParallelism()); + + // default value after generating + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // configured value after generating + env.setMaxParallelism(42); + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); + + // bounds configured parallelism 1 + try { + env.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds configured parallelism 2 + try { + env.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 1 + try { + operator.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 2 + try { + operator.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 3 + operator.setMaxParallelism(1); + Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); + + // bounds for max parallelism 4 + operator.setMaxParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); + + // override config + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); + } + + ///////////////////////////////////////////////////////////// + // Utilities + ///////////////////////////////////////////////////////////// + + + private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) { + StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); + StreamGraph streamGraph = env.getStreamGraph(); + return streamGraph.getStreamNode(dataStream.getId()).getOperator(); + } + + @SuppressWarnings("unchecked") + private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) { + dataStreamSource.addSink(new DiscardingSink<T>()); + AbstractUdfStreamOperator<?, ?> operator = + (AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource); + return (SourceFunction<T>) operator.getUserFunction(); + } + + public static class DummySplittableIterator<T> extends SplittableIterator<T> { + private static final long serialVersionUID = 1312752876092210499L; + + @SuppressWarnings("unchecked") + @Override + public Iterator<T>[] split(int numPartitions) { + return (Iterator<T>[]) new Iterator<?>[0]; + } + + @Override + public int getMaximumNumberOfSplits() { + return 0; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public static class ParentClass { + int num; + String string; + public ParentClass(int num, String string) { + this.num = num; + this.string = string; + } + } + + public static class SubClass extends ParentClass{ + public SubClass(int num, String string) { + super(num, string); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java deleted file mode 100644 index d29c833..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.environment; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.ContextEnvironment; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.SplittableIterator; -import org.junit.Assert; -import org.junit.Test; - -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -public class StreamExecutionEnvironmentTest { - - @Test - public void fromElementsWithBaseTypeTest1() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test(expected = IllegalArgumentException.class) - public void fromElementsWithBaseTypeTest2() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test - @SuppressWarnings("unchecked") - public void testFromCollectionParallelism() { - try { - TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - - try { - dataStream1.setParallelism(4); - fail("should throw an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - dataStream1.addSink(new DiscardingSink<Integer>()); - - DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(), - typeInfo).setParallelism(4); - - dataStream2.addSink(new DiscardingSink<Integer>()); - - env.getExecutionPlan(); - - assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); - assertEquals("Parallelism of parallel collection source must be 4.", - 4, - env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSources() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - DataStreamSource<Integer> src1 = env.addSource(srcFun); - src1.addSink(new DiscardingSink<Integer>()); - assertEquals(srcFun, getFunctionFromDataSource(src1)); - - List<Long> list = Arrays.asList(0L, 1L, 2L); - - DataStreamSource<Long> src2 = env.generateSequence(0, 2); - assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); - - DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L); - assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); - - DataStreamSource<Long> src4 = env.fromCollection(list); - assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); - } - - @Test - public void testDefaultParallelismIsDefault() { - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - StreamExecutionEnvironment.createLocalEnvironment().getParallelism()); - - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism()); - - StreamExecutionEnvironment contextEnv = new StreamContextEnvironment( - new ContextEnvironment( - mock(ClusterClient.class), - Collections.<URL>emptyList(), - Collections.<URL>emptyList(), - this.getClass().getClassLoader(), - null)); - - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - contextEnv.getParallelism()); - } - - @Test - public void testParallelismBounds() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - - - SingleOutputStreamOperator<Object> operator = - env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Collector<Object> out) throws Exception { - - } - }); - - // default value for max parallelism - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // bounds for parallelism 1 - try { - operator.setParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for parallelism 2 - operator.setParallelism(1); - Assert.assertEquals(1, operator.getParallelism()); - - // bounds for parallelism 3 - operator.setParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getParallelism()); - - // default value after generating - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // configured value after generating - env.setMaxParallelism(42); - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); - - // bounds configured parallelism 1 - try { - env.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds configured parallelism 2 - try { - env.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 1 - try { - operator.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 2 - try { - operator.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 3 - operator.setMaxParallelism(1); - Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); - - // bounds for max parallelism 4 - operator.setMaxParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); - - // override config - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); - } - - ///////////////////////////////////////////////////////////// - // Utilities - ///////////////////////////////////////////////////////////// - - - private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) { - StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); - StreamGraph streamGraph = env.getStreamGraph(); - return streamGraph.getStreamNode(dataStream.getId()).getOperator(); - } - - @SuppressWarnings("unchecked") - private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) { - dataStreamSource.addSink(new DiscardingSink<T>()); - AbstractUdfStreamOperator<?, ?> operator = - (AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource); - return (SourceFunction<T>) operator.getUserFunction(); - } - - public static class DummySplittableIterator<T> extends SplittableIterator<T> { - private static final long serialVersionUID = 1312752876092210499L; - - @SuppressWarnings("unchecked") - @Override - public Iterator<T>[] split(int numPartitions) { - return (Iterator<T>[]) new Iterator<?>[0]; - } - - @Override - public int getMaximumNumberOfSplits() { - return 0; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public T next() { - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - public static class ParentClass { - int num; - String string; - public ParentClass(int num, String string) { - this.num = num; - this.string = string; - } - } - - public static class SubClass extends ParentClass{ - public SubClass(int num, String string) { - super(num, string); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index abf51ab..6d2fcaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -112,10 +112,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testDisabledCheckpointing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */); + StreamGraph streamGraph = new StreamGraph(env); assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); - StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */); + StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); JobGraph jobGraph = jobGraphGenerator.createJobGraph(); JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); @@ -137,7 +137,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } }) .print(); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); JobVertex sourceVertex = verticesSorted.get(0); @@ -224,7 +224,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { }); sinkMethod.invoke(sink, resource5); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); @@ -291,7 +291,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { }).disableChaining().name("test_sink"); sinkMethod.invoke(sink, resource5); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); for (JobVertex jobVertex : jobGraph.getVertices()) { if (jobVertex.getName().contains("test_source")) { http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java index c4bed37..7cac1e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java @@ -131,7 +131,7 @@ public class FoldApplyProcessWindowFunctionTest { transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */); + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); List<Integer> result = new ArrayList<>(); List<Integer> input = new ArrayList<>(); @@ -240,7 +240,7 @@ public class FoldApplyProcessWindowFunctionTest { transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */); + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); List<Integer> result = new ArrayList<>(); List<Integer> input = new ArrayList<>(); @@ -310,10 +310,6 @@ public class FoldApplyProcessWindowFunctionTest { public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - public DummyStreamExecutionEnvironment() { - super(1); - } - @Override public JobExecutionResult execute(String jobName) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index 6ddca34..fecd440 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -117,7 +117,7 @@ public class FoldApplyWindowFunctionTest { transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */); + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); List<Integer> result = new ArrayList<>(); List<Integer> input = new ArrayList<>(); @@ -140,10 +140,6 @@ public class FoldApplyWindowFunctionTest { public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - public DummyStreamExecutionEnvironment() { - super(1); - } - @Override public JobExecutionResult execute(String jobName) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 927c94e..0b2587e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -672,6 +672,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { object StreamExecutionEnvironment { + /** + * Sets the default parallelism that will be used for the local execution + * environment created by [[createLocalEnvironment()]]. + * + * @param parallelism The default parallelism to use for local execution. + */ + @PublicEvolving + def setDefaultLocalParallelism(parallelism: Int) : Unit = + JavaEnv.setDefaultLocalParallelism(parallelism) + + /** + * Gets the default parallelism that will be used for the local execution environment created by + * [[createLocalEnvironment()]]. + */ + @PublicEvolving + def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism + // -------------------------------------------------------------------------- // context environment // -------------------------------------------------------------------------- @@ -693,14 +710,13 @@ object StreamExecutionEnvironment { /** * Creates a local execution environment. The local execution environment will run the * program in a multi-threaded fashion in the same JVM as the environment was created in. + * + * This method sets the environment's default parallelism to given parameter, which + * defaults to the value set via [[setDefaultLocalParallelism(Int)]]. */ - def createLocalEnvironment(parallelism: Int = -1): + def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): StreamExecutionEnvironment = { - if (parallelism == -1) { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment()) - } else { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) - } + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 08153be..60c609d 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -255,10 +255,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val sink = map.addSink(x => {}) assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - // default parallelism is only actualized when transforming to JobGraph - assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) try { src.setParallelism(3) @@ -273,11 +272,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { // the parallelism does not change since some windowing code takes the parallelism from // input operations and that cannot change dynamically assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - // setting a parallelism on the env/in the ExecutionConfig means that operators - // pick it up when being instantiated - assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) val parallelSource = env.generateSequence(0, 0) parallelSource.print() http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 90d8790..64c68dc 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -36,7 +36,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) { - super(parallelism); this.executor = Preconditions.checkNotNull(executor); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 0a9f686..49ff744 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -382,10 +382,6 @@ public class AccumulatorLiveITCase extends TestLogger { */ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - public DummyStreamExecutionEnvironment() { - super(1 /* default parallelism */); - } - @Override public JobExecutionResult execute() throws Exception { return execute("default");