Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator"

This reverts commit 9cfae899358e0694c3ecedae1fad20e428a1d359.

The fixes around FLINK-5808 introduced follow-up issues.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0182141d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0182141d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0182141d

Branch: refs/heads/master
Commit: 0182141d41be52ae0cf4a3563d4b8c6f3daca02e
Parents: a13750c
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Apr 4 14:04:40 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Apr 18 17:42:10 2017 +0200

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            |   4 -
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 -
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 +++++++++++++++++
 .../StreamExecutionEnvironmentTest.java         | 317 -------------------
 .../graph/StreamingJobGraphGeneratorTest.java   |  10 +-
 .../FoldApplyProcessWindowFunctionTest.java     |   8 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 -
 .../accumulators/AccumulatorLiveITCase.java     |   4 -
 18 files changed, 397 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 244660a..61ca55f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -46,9 +46,6 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
-       /** The default parallelism used when creating a local environment */
-       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-
        /** The configuration to use for the mini cluster */
        private final Configuration conf;
 
@@ -65,7 +62,6 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         * @param config The configuration used to configure the local executor.
         */
        public Flip6LocalStreamEnvironment(Configuration config) {
-               super(defaultLocalParallelism);
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The Flip6LocalStreamEnvironment cannot 
be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 117f6d8..566beba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -46,9 +46,6 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-       /** The default parallelism used when creating a local environment */
-       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalStreamEnvironment.class);
        
        /** The configuration to use for the local cluster */
@@ -58,43 +55,24 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         * Creates a new local stream environment that uses the default 
configuration.
         */
        public LocalStreamEnvironment() {
-               this(defaultLocalParallelism);
+               this(null);
        }
 
        /**
-        * Creates a new local stream environment that uses the default 
configuration.
-        */
-       public LocalStreamEnvironment(int parallelism) {
-               this(null, parallelism);
-       }
-
-
-       /**
         * Creates a new local stream environment that configures its local 
executor with the given configuration.
         *
         * @param config The configuration used to configure the local executor.
         */
        public LocalStreamEnvironment(Configuration config) {
-               this(config, defaultLocalParallelism);
-       }
-
-       /**
-        * Creates a new local stream environment that configures its local 
executor with the given configuration.
-        *
-        * @param config The configuration used to configure the local executor.
-        */
-       public LocalStreamEnvironment(Configuration config, int parallelism) {
-               super(parallelism);
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The LocalStreamEnvironment cannot be 
used when submitting a program through a client, " +
                                                        "or running in a 
TestEnvironment context.");
                }
-
+               
                this.conf = config == null ? new Configuration() : config;
        }
 
-
        /**
         * Executes the JobGraph of the on a mini cluster of CLusterUtil with a 
user
         * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 5684e28..333f9c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,10 +129,6 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         *            The protocol must be supported by the {@link 
java.net.URLClassLoader}.
         */
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
-               super(GlobalConfiguration.loadConfiguration().getInteger(
-                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                               ConfigConstants.DEFAULT_PARALLELISM));
-               
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The RemoteEnvironment cannot be used 
when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 51078f2..49c5347 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,13 +38,14 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        private final ContextEnvironment ctx;
 
        protected StreamContextEnvironment(ContextEnvironment ctx) {
-               // if the batch ContextEnvironment has a parallelism this must 
have come from
-               // the CLI Client. We should set that as our default parallelism
-               super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
-                               
GlobalConfiguration.loadConfiguration().getInteger(
-                                               
ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                                               
ConfigConstants.DEFAULT_PARALLELISM));
                this.ctx = ctx;
+               if (ctx.getParallelism() > 0) {
+                       setParallelism(ctx.getParallelism());
+               } else {
+                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                                       ConfigConstants.DEFAULT_PARALLELISM));
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b6540dc..e836616 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -112,6 +112,9 @@ public abstract class StreamExecutionEnvironment {
        /** The environment of the context (local by default, cluster if 
invoked through command line) */
        private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory;
 
+       /** The default parallelism used when creating a local environment */
+       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
+
        // 
------------------------------------------------------------------------
 
        /** The execution configuration for this environment */
@@ -132,23 +135,11 @@ public abstract class StreamExecutionEnvironment {
        /** The time characteristic used by the data streams */
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
-       /** The parallelism to use when no parallelism is set on an operation. 
*/
-       private final int defaultParallelism;
-
 
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
 
-
-       public StreamExecutionEnvironment() {
-               this(ConfigConstants.DEFAULT_PARALLELISM);
-       }
-
-       public StreamExecutionEnvironment(int defaultParallelism) {
-               this.defaultParallelism = defaultParallelism;
-       }
-
        /**
         * Gets the config object.
         */
@@ -1527,7 +1518,7 @@ public abstract class StreamExecutionEnvironment {
                if (transformations.size() <= 0) {
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot execute.");
                }
-               return StreamGraphGenerator.generate(this, transformations, 
defaultParallelism);
+               return StreamGraphGenerator.generate(this, transformations);
        }
 
        /**
@@ -1615,7 +1606,7 @@ public abstract class StreamExecutionEnvironment {
         * @return A local execution environment.
         */
        public static LocalStreamEnvironment createLocalEnvironment() {
-               return new LocalStreamEnvironment();
+               return createLocalEnvironment(defaultLocalParallelism);
        }
 
        /**
@@ -1624,12 +1615,14 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param defaultParallelism The default parallelism for the local 
environment.
-        * 
+        * @param parallelism
+        *              The parallelism for the local environment.
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism) {
-               return new LocalStreamEnvironment(defaultParallelism);
+       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism) {
+               LocalStreamEnvironment env = new LocalStreamEnvironment();
+               env.setParallelism(parallelism);
+               return env;
        }
 
        /**
@@ -1638,13 +1631,16 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param defaultParallelism The parallelism for the local environment.
-        * @param configuration Pass a custom configuration into the cluster
-        *
+        * @param parallelism
+        *              The parallelism for the local environment.
+        *      @param configuration
+        *              Pass a custom configuration into the cluster
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism, Configuration configuration) {
-               return new LocalStreamEnvironment(configuration, 
defaultParallelism);
+       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism, Configuration configuration) {
+               LocalStreamEnvironment currentEnvironment = new 
LocalStreamEnvironment(configuration);
+               currentEnvironment.setParallelism(parallelism);
+               return currentEnvironment;
        }
 
        /**
@@ -1669,6 +1665,7 @@ public abstract class StreamExecutionEnvironment {
                conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
                LocalStreamEnvironment localEnv = new 
LocalStreamEnvironment(conf);
+               localEnv.setParallelism(defaultLocalParallelism);
 
                return localEnv;
        }
@@ -1754,6 +1751,28 @@ public abstract class StreamExecutionEnvironment {
                return new RemoteStreamEnvironment(host, port, clientConfig, 
jarFiles);
        }
 
+       /**
+        * Gets the default parallelism that will be used for the local 
execution environment created by
+        * {@link #createLocalEnvironment()}.
+        *
+        * @return The default local parallelism
+        */
+       @PublicEvolving
+       public static int getDefaultLocalParallelism() {
+               return defaultLocalParallelism;
+       }
+
+       /**
+        * Sets the default parallelism that will be used for the local 
execution
+        * environment created by {@link #createLocalEnvironment()}.
+        *
+        * @param parallelism The parallelism to use as the default local 
parallelism.
+        */
+       @PublicEvolving
+       public static void setDefaultLocalParallelism(int parallelism) {
+               defaultLocalParallelism = parallelism;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Methods to control the context and local environments for execution 
from packaged programs
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 9c676c4..b1521f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
        private ExecutionEnvironment env;
 
        protected StreamPlanEnvironment(ExecutionEnvironment env) {
-               super(GlobalConfiguration.loadConfiguration().getInteger(
-                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                               ConfigConstants.DEFAULT_PARALLELISM));
-
+               super();
                this.env = env;
+
+               int parallelism = env.getParallelism();
+               if (parallelism > 0) {
+                       setParallelism(parallelism);
+               } else {
+                       // determine parallelism
+                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                                       ConfigConstants.DEFAULT_PARALLELISM));
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a99efb1..cc3ca9e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -94,14 +94,12 @@ public class StreamGraph extends StreamingPlan {
        private AbstractStateBackend stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-       private final int defaultParallelism;
 
-       public StreamGraph(StreamExecutionEnvironment environment, int 
defaultParallelism) {
+       public StreamGraph(StreamExecutionEnvironment environment) {
                this.environment = environment;
                this.executionConfig = environment.getConfig();
                this.checkpointConfig = environment.getCheckpointConfig();
 
-               this.defaultParallelism = defaultParallelism;
                // create an empty new stream graph.
                clear();
        }
@@ -657,7 +655,7 @@ public class StreamGraph extends StreamingPlan {
                                                        + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
-               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this, defaultParallelism);
+               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this);
 
                return jobgraphGenerator.createJobGraph();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 9fc8dd8..f9eec4f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,11 +98,12 @@ public class StreamGraphGenerator {
        // we have loops, i.e. feedback edges.
        private Map<StreamTransformation<?>, Collection<Integer>> 
alreadyTransformed;
 
+
        /**
         * Private constructor. The generator should only be invoked using 
{@link #generate}.
         */
-       private StreamGraphGenerator(StreamExecutionEnvironment env, int 
defaultParallelism) {
-               this.streamGraph = new StreamGraph(env, defaultParallelism);
+       private StreamGraphGenerator(StreamExecutionEnvironment env) {
+               this.streamGraph = new StreamGraph(env);
                this.streamGraph.setChaining(env.isChainingEnabled());
                this.streamGraph.setStateBackend(env.getStateBackend());
                this.env = env;
@@ -119,11 +120,8 @@ public class StreamGraphGenerator {
         *
         * @return The generated {@code StreamGraph}
         */
-       public static StreamGraph generate(
-                       StreamExecutionEnvironment env,
-                       List<StreamTransformation<?>> transformations,
-                       int defaultParallelism) {
-               return new StreamGraphGenerator(env, 
defaultParallelism).generateInternal(transformations);
+       public static StreamGraph generate(StreamExecutionEnvironment env, 
List<StreamTransformation<?>> transformations) {
+               return new 
StreamGraphGenerator(env).generateInternal(transformations);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0896eb7..794de5a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -93,13 +92,10 @@ public class StreamingJobGraphGenerator {
        private final StreamGraphHasher defaultStreamGraphHasher;
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-       private final int defaultParallelism;
-
-       public StreamingJobGraphGenerator(StreamGraph streamGraph, int 
defaultParallelism) {
+       public StreamingJobGraphGenerator(StreamGraph streamGraph) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
                this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphHasherV1(), new StreamGraphUserHashHasher());
-               this.defaultParallelism = defaultParallelism;
        }
 
        private void init() {
@@ -342,12 +338,12 @@ public class StreamingJobGraphGenerator {
 
                int parallelism = streamNode.getParallelism();
 
-               if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-                       parallelism = defaultParallelism;
+               if (parallelism > 0) {
+                       jobVertex.setParallelism(parallelism);
+               } else {
+                       parallelism = jobVertex.getParallelism();
                }
 
-               jobVertex.setParallelism(parallelism);
-
                jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
                if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..3fc1344
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StreamExecutionEnvironmentTest {
+
+       @Test
+       public void fromElementsWithBaseTypeTest1() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void fromElementsWithBaseTypeTest2() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testFromCollectionParallelism() {
+               try {
+                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+                       try {
+                               dataStream1.setParallelism(4);
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       dataStream1.addSink(new DiscardingSink<Integer>());
+       
+                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+                                       typeInfo).setParallelism(4);
+
+                       dataStream2.addSink(new DiscardingSink<Integer>());
+
+                       env.getExecutionPlan();
+
+                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+                       assertEquals("Parallelism of parallel collection source 
must be 4.",
+                                       4, 
+                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSources() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+               DataStreamSource<Integer> src1 = env.addSource(srcFun);
+               src1.addSink(new DiscardingSink<Integer>());
+               assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+               List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
+
+               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
+
+               DataStreamSource<Long> src4 = env.fromCollection(list);
+               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
+       }
+
+       @Test
+       public void testParallelismBounds() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+
+
+               SingleOutputStreamOperator<Object> operator =
+                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
+
+                       }
+               });
+
+               // default value for max parallelism
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for parallelism 1
+               try {
+                       operator.setParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for parallelism 2
+               operator.setParallelism(1);
+               Assert.assertEquals(1, operator.getParallelism());
+
+               // bounds for parallelism 3
+               operator.setParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, operator.getParallelism());
+
+               // default value after generating
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // configured value after generating
+               env.setMaxParallelism(42);
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds configured parallelism 1
+               try {
+                       env.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds configured parallelism 2
+               try {
+                       env.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 1
+               try {
+                       operator.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 2
+               try {
+                       operator.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 3
+               operator.setMaxParallelism(1);
+               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for max parallelism 4
+               operator.setMaxParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
+
+               // override config
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
+       }
+
+       /////////////////////////////////////////////////////////////
+       // Utilities
+       /////////////////////////////////////////////////////////////
+
+
+       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+               StreamGraph streamGraph = env.getStreamGraph();
+               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+               dataStreamSource.addSink(new DiscardingSink<T>());
+               AbstractUdfStreamOperator<?, ?> operator =
+                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
+               return (SourceFunction<T>) operator.getUserFunction();
+       }
+
+       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
+               private static final long serialVersionUID = 
1312752876092210499L;
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public Iterator<T>[] split(int numPartitions) {
+                       return (Iterator<T>[]) new Iterator<?>[0];
+               }
+
+               @Override
+               public int getMaximumNumberOfSplits() {
+                       return 0;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return false;
+               }
+
+               @Override
+               public T next() {
+                       throw new NoSuchElementException();
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       public static class ParentClass {
+               int num;
+               String string;
+               public ParentClass(int num, String string) {
+                       this.num = num;
+                       this.string = string;
+               }
+       }
+
+       public static class SubClass extends ParentClass{
+               public SubClass(int num, String string) {
+                       super(num, string);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index d29c833..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class StreamExecutionEnvironmentTest {
-
-       @Test
-       public void fromElementsWithBaseTypeTest1() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void fromElementsWithBaseTypeTest2() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testFromCollectionParallelism() {
-               try {
-                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-                       try {
-                               dataStream1.setParallelism(4);
-                               fail("should throw an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       dataStream1.addSink(new DiscardingSink<Integer>());
-       
-                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-                                       typeInfo).setParallelism(4);
-
-                       dataStream2.addSink(new DiscardingSink<Integer>());
-
-                       env.getExecutionPlan();
-
-                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-                       assertEquals("Parallelism of parallel collection source 
must be 4.",
-                                       4, 
-                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSources() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-               DataStreamSource<Integer> src1 = env.addSource(srcFun);
-               src1.addSink(new DiscardingSink<Integer>());
-               assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-               List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
-
-               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
-
-               DataStreamSource<Long> src4 = env.fromCollection(list);
-               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
-       }
-
-       @Test
-       public void testDefaultParallelismIsDefault() {
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               
StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
-
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               
StreamExecutionEnvironment.createRemoteEnvironment("dummy", 
1234).getParallelism());
-
-               StreamExecutionEnvironment contextEnv = new 
StreamContextEnvironment(
-                               new ContextEnvironment(
-                                               mock(ClusterClient.class),
-                                               Collections.<URL>emptyList(),
-                                               Collections.<URL>emptyList(),
-                                               
this.getClass().getClassLoader(),
-                                               null));
-
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               contextEnv.getParallelism());
-       }
-
-       @Test
-       public void testParallelismBounds() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-
-
-               SingleOutputStreamOperator<Object> operator =
-                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
-
-                       }
-               });
-
-               // default value for max parallelism
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for parallelism 1
-               try {
-                       operator.setParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for parallelism 2
-               operator.setParallelism(1);
-               Assert.assertEquals(1, operator.getParallelism());
-
-               // bounds for parallelism 3
-               operator.setParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, operator.getParallelism());
-
-               // default value after generating
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // configured value after generating
-               env.setMaxParallelism(42);
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds configured parallelism 1
-               try {
-                       env.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds configured parallelism 2
-               try {
-                       env.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 1
-               try {
-                       operator.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 2
-               try {
-                       operator.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 3
-               operator.setMaxParallelism(1);
-               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for max parallelism 4
-               operator.setMaxParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
-
-               // override config
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
-       }
-
-       /////////////////////////////////////////////////////////////
-       // Utilities
-       /////////////////////////////////////////////////////////////
-
-
-       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
-               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
-               StreamGraph streamGraph = env.getStreamGraph();
-               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
-       }
-
-       @SuppressWarnings("unchecked")
-       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-               dataStreamSource.addSink(new DiscardingSink<T>());
-               AbstractUdfStreamOperator<?, ?> operator =
-                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
-               return (SourceFunction<T>) operator.getUserFunction();
-       }
-
-       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
-               private static final long serialVersionUID = 
1312752876092210499L;
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public Iterator<T>[] split(int numPartitions) {
-                       return (Iterator<T>[]) new Iterator<?>[0];
-               }
-
-               @Override
-               public int getMaximumNumberOfSplits() {
-                       return 0;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return false;
-               }
-
-               @Override
-               public T next() {
-                       throw new NoSuchElementException();
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       public static class ParentClass {
-               int num;
-               String string;
-               public ParentClass(int num, String string) {
-                       this.num = num;
-                       this.string = string;
-               }
-       }
-
-       public static class SubClass extends ParentClass{
-               public SubClass(int num, String string) {
-                       super(num, string);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index abf51ab..6d2fcaa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -112,10 +112,10 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
        @Test
        public void testDisabledCheckpointing() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamGraph streamGraph = new StreamGraph(env, 1 /* default 
parallelism */);
+               StreamGraph streamGraph = new StreamGraph(env);
                assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
+               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
                JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
                JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
@@ -137,7 +137,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                                }
                        })
                        .print();
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
                List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
                JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +224,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                });
                sinkMethod.invoke(sink, resource5);
 
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
                JobVertex sourceMapFilterVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
                JobVertex reduceSinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +291,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                }).disableChaining().name("test_sink");
                sinkMethod.invoke(sink, resource5);
 
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
                for (JobVertex jobVertex : jobGraph.getVertices()) {
                        if (jobVertex.getName().contains("test_source")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index c4bed37..7cac1e1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -131,7 +131,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations, 1 /* default parallelism */);
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
 
                List<Integer> result = new ArrayList<>();
                List<Integer> input = new ArrayList<>();
@@ -240,7 +240,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations, 1 /* default parallelism */);
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
 
                List<Integer> result = new ArrayList<>();
                List<Integer> input = new ArrayList<>();
@@ -310,10 +310,6 @@ public class FoldApplyProcessWindowFunctionTest {
 
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
-               public DummyStreamExecutionEnvironment() {
-                       super(1);
-               }
-
                @Override
                public JobExecutionResult execute(String jobName) throws 
Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 6ddca34..fecd440 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -117,7 +117,7 @@ public class FoldApplyWindowFunctionTest {
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations, 1 /* default parallelism */);
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
 
                List<Integer> result = new ArrayList<>();
                List<Integer> input = new ArrayList<>();
@@ -140,10 +140,6 @@ public class FoldApplyWindowFunctionTest {
 
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
-               public DummyStreamExecutionEnvironment() {
-                       super(1);
-               }
-
                @Override
                public JobExecutionResult execute(String jobName) throws 
Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 927c94e..0b2587e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -672,6 +672,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
+  /**
+   * Sets the default parallelism that will be used for the local execution
+   * environment created by [[createLocalEnvironment()]].
+   *
+   * @param parallelism The default parallelism to use for local execution.
+   */
+  @PublicEvolving
+  def setDefaultLocalParallelism(parallelism: Int) : Unit =
+    JavaEnv.setDefaultLocalParallelism(parallelism)
+
+  /**
+   * Gets the default parallelism that will be used for the local execution 
environment created by
+   * [[createLocalEnvironment()]].
+   */
+  @PublicEvolving
+  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
+  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -693,14 +710,13 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment 
will run the
    * program in a multi-threaded fashion in the same JVM as the environment 
was created in.
+   *
+   * This method sets the environment's default parallelism to given 
parameter, which
+   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = -1):
+  def createLocalEnvironment(parallelism: Int = 
JavaEnv.getDefaultLocalParallelism):
       StreamExecutionEnvironment = {
-    if (parallelism == -1) {
-      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
-    } else {
-      new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
-    }
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 08153be..60c609d 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,10 +255,9 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // default parallelism is only actualized when transforming to JobGraph
-    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(-1 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -273,11 +272,9 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the 
parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // setting a parallelism on the env/in the ExecutionConfig means that 
operators
-    // pick it up when being instantiated
-    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(7 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 90d8790..64c68dc 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,7 +36,6 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
        
 
        public TestStreamEnvironment(LocalFlinkMiniCluster executor, int 
parallelism) {
-               super(parallelism);
                this.executor = Preconditions.checkNotNull(executor);
                setParallelism(parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 0a9f686..49ff744 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -382,10 +382,6 @@ public class AccumulatorLiveITCase extends TestLogger {
         */
        private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
-               public DummyStreamExecutionEnvironment() {
-                       super(1 /* default parallelism */);
-               }
-
                @Override
                public JobExecutionResult execute() throws Exception {
                        return execute("default");

Reply via email to