[FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA 
services

Remove client less factory methods from ZooKeeperUtils

Introduce default job id

This closes #3781.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddd6a99a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddd6a99a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddd6a99a

Branch: refs/heads/master
Commit: ddd6a99a95b56c52ea5b5153b7270b578f5479bc
Parents: a0bb99c
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Mar 16 17:03:03 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri May 5 11:06:07 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   6 +-
 .../flink/client/cli/CustomCommandLine.java     |   4 +-
 .../flink/client/program/ClusterClient.java     |  89 ++++----
 .../client/program/StandaloneClusterClient.java |   8 +-
 .../RemoteExecutorHostnameResolutionTest.java   |  22 +-
 .../apache/flink/client/program/ClientTest.java |   5 +-
 ...rRetrievalServiceHostnameResolutionTest.java |  32 +--
 .../api/avro/AvroExternalJarProgramITCase.java  |  22 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../connectors/kafka/Kafka010ITCase.java        |  14 +-
 .../connectors/kafka/Kafka08ITCase.java         |   4 +-
 .../kafka/Kafka09SecuredRunITCase.java          |   2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  96 ++++----
 .../connectors/kafka/KafkaProducerTestBase.java |   2 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  18 +-
 .../connectors/kafka/KafkaTestBase.java         |  23 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   4 +-
 .../flink/contrib/streaming/CollectITCase.java  |   9 +-
 .../operations/DegreesWithExceptionITCase.java  |  42 ++--
 .../ReduceOnEdgesWithExceptionITCase.java       |  37 ++--
 .../ReduceOnNeighborsWithExceptionITCase.java   |  43 ++--
 .../apache/flink/ml/util/FlinkTestBase.scala    |   2 +-
 .../src/test/resources/log4j-test.properties    |  38 ++++
 .../src/test/resources/logback-test.xml         |  42 ++++
 .../MesosApplicationMasterRunner.java           |  39 +++-
 .../MesosFlinkResourceManagerTest.java          |  37 +++-
 .../BackPressureStatsTrackerITCase.java         |  19 +-
 .../StackTraceSampleCoordinatorITCase.java      |  19 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  31 ++-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  47 ++--
 .../flink/runtime/client/JobClientActor.java    |   5 +-
 .../runtime/client/JobListeningContext.java     |  22 +-
 .../clusterframework/BootstrapTools.java        |  17 +-
 .../HighAvailabilityServices.java               |   7 +
 .../HighAvailabilityServicesUtils.java          |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   9 +
 .../ZooKeeperSubmittedJobGraphStore.java        |  12 +-
 .../ZooKeeperLeaderElectionService.java         | 184 ++++++++++------
 .../ZooKeeperLeaderRetrievalService.java        | 122 +++++++----
 .../minicluster/StandaloneMiniCluster.java      | 154 +++++++++++++
 .../runtime/query/QueryableStateClient.java     |  10 +-
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |  16 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  67 ++----
 .../flink/runtime/util/SerializedThrowable.java |   4 +
 .../flink/runtime/util/ZooKeeperUtils.java      |  27 ---
 .../flink/runtime/jobmanager/JobManager.scala   | 158 +++++++-------
 .../runtime/messages/TaskManagerMessages.scala  |  13 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 142 +++++++-----
 .../minicluster/LocalFlinkMiniCluster.scala     |  41 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |  71 +++---
 .../checkpoint/CoordinatorShutdownTest.java     |   3 +-
 .../runtime/client/JobClientActorTest.java      |   8 +-
 .../clusterframework/ClusterShutdownITCase.java | 201 ++++++++++-------
 .../clusterframework/ResourceManagerITCase.java | 178 +++++++++------
 .../clusterframework/ResourceManagerTest.java   | 128 ++++++++++-
 .../highavailability/ManualLeaderService.java   | 116 ++++++++++
 .../TestingManualHighAvailabilityServices.java  | 150 +++++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  35 +--
 .../JobManagerProcessReapingTest.java           |   3 +-
 .../jobmanager/JobManagerStartupTest.java       |   5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 188 ++++++++++------
 .../flink/runtime/jobmanager/JobSubmitTest.java |  15 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |   3 +-
 .../LeaderChangeJobRecoveryTest.java            |  32 ++-
 .../LeaderChangeStateCleanupTest.java           |  50 +++--
 .../LeaderElectionRetrievalTestingCluster.java  | 121 -----------
 .../TestingLeaderElectionService.java           |   4 +
 .../TestingLeaderRetrievalService.java          |   4 +
 .../ZooKeeperLeaderElectionTest.java            | 109 +++++-----
 .../ZooKeeperLeaderRetrievalTest.java           |  91 ++++----
 .../runtime/metrics/TaskManagerMetricsTest.java |  26 ++-
 ...askManagerComponentsStartupShutdownTest.java |  47 ++--
 .../TaskManagerConfigurationTest.java           |  76 ++++---
 .../TaskManagerProcessReapingTestBase.java      |  73 +++++--
 .../TaskManagerRegistrationTest.java            | 217 +++++++++++--------
 .../taskmanager/TaskManagerStartupTest.java     |  62 +++++-
 .../runtime/taskmanager/TaskManagerTest.java    | 136 ++++++++++--
 .../jobmanager/JobManagerRegistrationTest.scala |  88 +++++---
 .../runtime/testingUtils/TestingCluster.scala   |  37 +++-
 .../runtime/testingUtils/TestingUtils.scala     | 176 ++++++---------
 .../org/apache/flink/api/scala/FlinkShell.scala |  14 +-
 .../flink/api/scala/ScalaShellITCase.scala      | 104 ++++-----
 .../environment/RemoteStreamEnvironment.java    |   6 +-
 .../streaming/util/TestStreamEnvironment.java   |  79 ++++++-
 .../flink/test/util/JavaProgramTestBase.java    |   4 +-
 .../test/util/MultipleProgramsTestBase.java     |   2 +-
 .../apache/flink/test/util/TestEnvironment.java | 133 ++++++++++--
 .../accumulators/AccumulatorErrorITCase.java    |  49 ++---
 ...tractEventTimeWindowCheckpointingITCase.java |  21 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  21 +-
 .../StreamFaultToleranceTestBase.java           |   4 +-
 .../WindowCheckpointingITCase.java              |  18 +-
 .../test/classloading/ClassLoaderITCase.java    | 136 +++++++-----
 .../jar/CheckpointedStreamingProgram.java       |   9 +-
 .../jar/CheckpointingCustomKvStateProgram.java  |   9 +-
 .../jar/CustomInputSplitProgram.java            |  12 +-
 .../classloading/jar/CustomKvStateProgram.java  |  15 +-
 .../test/classloading/jar/KMeansForTest.java    |  17 +-
 .../jar/LegacyCheckpointedStreamingProgram.java |   6 +-
 .../jar/StreamingCustomInputSplitProgram.java   |  11 +-
 .../test/classloading/jar/StreamingProgram.java |   7 +-
 .../test/classloading/jar/UserCodeType.java     |   6 +-
 .../clients/examples/JobRetrievalITCase.java    |   7 +-
 .../CustomDistributionITCase.java               |   2 +-
 .../RemoteEnvironmentITCase.java                |  46 ++--
 .../flink/test/misc/AutoParallelismITCase.java  |   8 +-
 .../test/misc/CustomSerializationITCase.java    |  57 ++---
 .../test/misc/MiscellaneousIssuesITCase.java    |  52 ++---
 ...SuccessAfterNetworkBuffersFailureITCase.java |  28 +--
 .../query/AbstractQueryableStateITCase.java     |  33 ++-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  18 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  13 +-
 .../flink/test/recovery/FastFailuresITCase.java |  10 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |  42 +++-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  24 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |  28 ++-
 .../recovery/ProcessFailureCancelingITCase.java |  30 ++-
 .../TaskManagerFailureRecoveryITCase.java       |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |  17 +-
 .../AbstractOperatorRestoreTestBase.java        |  39 +++-
 .../test/streaming/runtime/TimestampITCase.java |  63 +++---
 .../jobmanager/JobManagerFailsITCase.scala      |  17 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |   2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   7 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  40 +++-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   2 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  38 +++-
 .../apache/flink/yarn/YarnClusterClient.java    |  36 +--
 .../apache/flink/yarn/YarnClusterClientV2.java  |   5 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   5 +-
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |  13 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   7 +-
 135 files changed, 3474 insertions(+), 2131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 0d61cbd..74d5f5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -263,7 +263,11 @@ public class CliFrontend {
                }
                finally {
                        if (client != null) {
-                               client.shutdown();
+                               try {
+                                       client.shutdown();
+                               } catch (Exception e) {
+                                       LOG.warn("Could not properly shut down 
the cluster client.", e);
+                               }
                        }
                        if (program != null) {
                                program.deleteExtractedLibraries();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index c58c74c..a4cb479 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -75,11 +75,11 @@ public interface CustomCommandLine<ClusterType extends 
ClusterClient> {
         * @param config The Flink config to use
         * @param userJarFiles User jar files to include in the classpath of 
the cluster.
         * @return The client to communicate with the cluster which the 
CustomCommandLine brought up.
-        * @throws UnsupportedOperationException if the operation is not 
supported
+        * @throws Exception if the cluster could not be created
         */
        ClusterType createCluster(
                        String applicationName,
                        CommandLine commandLine,
                        Configuration config,
-                       List<URL> userJarFiles) throws 
UnsupportedOperationException;
+                       List<URL> userJarFiles) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0f88f7c..6770eee 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -42,6 +42,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.client.JobRetrievalException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -72,7 +75,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-
 /**
  * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
  */
@@ -95,6 +97,9 @@ public abstract class ClusterClient {
        /** Lookup timeout for the job manager retrieval service */
        private final FiniteDuration lookupTimeout;
 
+       /** Service factory for high available */
+       protected final HighAvailabilityServices highAvailabilityServices;
+
        /** Flag indicating whether to sysout print execution updates */
        private boolean printStatusDuringExecution = true;
 
@@ -119,10 +124,25 @@ public abstract class ClusterClient {
         *
         * @param flinkConfig The config used to obtain the job-manager's 
address, and used to configure the optimizer.
         *
-        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
+        * @throws Exception we cannot create the high availability services
         */
-       public ClusterClient(Configuration flinkConfig) throws IOException {
+       public ClusterClient(Configuration flinkConfig) throws Exception {
+               this(flinkConfig,
+                       
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+                               flinkConfig,
+                               Executors.directExecutor(),
+                               
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+       }
 
+       /**
+        * Creates a instance that submits the programs to the JobManager 
defined in the
+        * configuration. This method will try to resolve the JobManager 
hostname and throw an exception
+        * if that is not possible.
+        *
+        * @param flinkConfig The config used to obtain the job-manager's 
address, and used to configure the optimizer.
+        * @param highAvailabilityServices HighAvailabilityServices to use for 
leader retrieval
+        */
+       public ClusterClient(Configuration flinkConfig, 
HighAvailabilityServices highAvailabilityServices) {
                this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
                this.compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), flinkConfig);
 
@@ -130,6 +150,8 @@ public abstract class ClusterClient {
                this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
                this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, 
LOG);
+
+               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
        }
 
        // 
------------------------------------------------------------------------
@@ -202,12 +224,16 @@ public abstract class ClusterClient {
        /**
         * Shuts down the client. This stops the internal actor system and 
actors.
         */
-       public void shutdown() {
+       public void shutdown() throws Exception {
                synchronized (this) {
                        try {
                                finalizeCluster();
                        } finally {
-                               this.actorSystemLoader.shutdown();
+                               actorSystemLoader.shutdown();
+                       }
+
+                       if (highAvailabilityServices != null) {
+                               
highAvailabilityServices.closeAndCleanupAllData();
                        }
                }
        }
@@ -241,7 +267,8 @@ public abstract class ClusterClient {
                try {
                        LeaderConnectionInfo leaderConnectionInfo =
                                
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-                                       
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true), timeout);
+                                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                                       timeout);
 
                        return 
AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
                } catch (Exception e) {
@@ -411,17 +438,17 @@ public abstract class ClusterClient {
 
                waitForClusterToBeReady();
 
-               final LeaderRetrievalService leaderRetrievalService;
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-               } catch (Exception e) {
-                       throw new ProgramInvocationException("Could not create 
the leader retrieval service", e);
-               }
-
                try {
                        logAndSysout("Submitting job with JobID: " + 
jobGraph.getJobID() + ". Waiting for job completion.");
-                       this.lastJobExecutionResult = 
JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
-                               leaderRetrievalService, jobGraph, timeout, 
printStatusDuringExecution, classLoader);
+                       this.lastJobExecutionResult = 
JobClient.submitJobAndWait(
+                               actorSystemLoader.get(),
+                               flinkConfig,
+                               highAvailabilityServices,
+                               jobGraph,
+                               timeout,
+                               printStatusDuringExecution,
+                               classLoader);
+
                        return this.lastJobExecutionResult;
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
@@ -462,13 +489,6 @@ public abstract class ClusterClient {
         * @throws JobExecutionException if an error occurs during monitoring 
the job execution
         */
        public JobExecutionResult retrieveJob(JobID jobID) throws 
JobExecutionException {
-               final LeaderRetrievalService leaderRetrievalService;
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-               } catch (Exception e) {
-                       throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
-               }
-
                ActorGateway jobManagerGateway;
                try {
                        jobManagerGateway = getJobManagerGateway();
@@ -477,13 +497,13 @@ public abstract class ClusterClient {
                }
 
                final JobListeningContext listeningContext = 
JobClient.attachToRunningJob(
-                               jobID,
-                               jobManagerGateway,
-                               flinkConfig,
-                               actorSystemLoader.get(),
-                               leaderRetrievalService,
-                               timeout,
-                               printStatusDuringExecution);
+                       jobID,
+                       jobManagerGateway,
+                       flinkConfig,
+                       actorSystemLoader.get(),
+                       highAvailabilityServices,
+                       timeout,
+                       printStatusDuringExecution);
 
                return JobClient.awaitJobResult(listeningContext);
        }
@@ -496,13 +516,6 @@ public abstract class ClusterClient {
         * @throws JobExecutionException if an error occurs during monitoring 
the job execution
         */
        public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
-               final LeaderRetrievalService leaderRetrievalService;
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-               } catch (Exception e) {
-                       throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
-               }
-
                ActorGateway jobManagerGateway;
                try {
                        jobManagerGateway = getJobManagerGateway();
@@ -515,7 +528,7 @@ public abstract class ClusterClient {
                                jobManagerGateway,
                                flinkConfig,
                                actorSystemLoader.get(),
-                               leaderRetrievalService,
+                               highAvailabilityServices,
                                timeout,
                                printStatusDuringExecution);
        }
@@ -721,7 +734,7 @@ public abstract class ClusterClient {
        public ActorGateway getJobManagerGateway() throws Exception {
                LOG.debug("Looking up JobManager");
                return LeaderRetrievalUtils.retrieveLeaderGateway(
-                       
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true),
+                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                        actorSystemLoader.get(),
                        lookupTimeout);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc9..fd179c0 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -22,12 +22,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
@@ -38,10 +38,14 @@ import java.util.List;
  */
 public class StandaloneClusterClient extends ClusterClient {
 
-       public StandaloneClusterClient(Configuration config) throws IOException 
{
+       public StandaloneClusterClient(Configuration config) throws Exception {
                super(config);
        }
 
+       public StandaloneClusterClient(Configuration config, 
HighAvailabilityServices highAvailabilityServices) {
+               super(config, highAvailabilityServices);
+       }
+
        @Override
        public void waitForClusterToBeReady() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index d8fb3de..be93949 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
@@ -33,7 +32,6 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
@@ -48,26 +46,20 @@ public class RemoteExecutorHostnameResolutionTest extends 
TestLogger {
        }
 
        @Test
-       public void testUnresolvableHostname1() {
+       public void testUnresolvableHostname1() throws Exception {
 
                RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, 
port);
                try {
                        exec.executePlan(getProgram());
                        fail("This should fail with an 
ProgramInvocationException");
                }
-               catch (ProgramInvocationException e) {
+               catch (UnknownHostException ignored) {
                        // that is what we want!
-                       assertTrue(e.getCause() instanceof 
UnknownHostException);
-               }
-               catch (Exception e) {
-                       System.err.println("Wrong exception!");
-                       e.printStackTrace();
-                       fail(e.getMessage());
                }
        }
 
        @Test
-       public void testUnresolvableHostname2() {
+       public void testUnresolvableHostname2() throws Exception {
 
                InetSocketAddress add = new 
InetSocketAddress(nonExistingHostname, port);
                RemoteExecutor exec = new RemoteExecutor(add, new 
Configuration(),
@@ -76,14 +68,8 @@ public class RemoteExecutorHostnameResolutionTest extends 
TestLogger {
                        exec.executePlan(getProgram());
                        fail("This should fail with an 
ProgramInvocationException");
                }
-               catch (ProgramInvocationException e) {
+               catch (UnknownHostException ignored) {
                        // that is what we want!
-                       assertTrue(e.getCause() instanceof 
UnknownHostException);
-               }
-               catch (Exception e) {
-                       System.err.println("Wrong exception!");
-                       e.printStackTrace();
-                       fail(e.getMessage());
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index da297d6..b7ade2a 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -56,7 +56,6 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
@@ -199,7 +198,7 @@ public class ClientTest extends TestLogger {
         * This test verifies correct job submission messaging logic and plan 
translation calls.
         */
        @Test
-       public void shouldSubmitToJobClient() throws IOException, 
ProgramInvocationException {
+       public void shouldSubmitToJobClient() throws Exception {
                jobManagerSystem.actorOf(
                        Props.create(SuccessReturningActor.class),
                        JobMaster.JOB_MANAGER_NAME);
@@ -217,7 +216,7 @@ public class ClientTest extends TestLogger {
         * This test verifies correct that the correct exception is thrown when 
the job submission fails.
         */
        @Test
-       public void shouldSubmitToJobClientFails() throws IOException {
+       public void shouldSubmitToJobClientFails() throws Exception {
                        jobManagerSystem.actorOf(
                                Props.create(FailureReturningActor.class),
                                JobMaster.JOB_MANAGER_NAME);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index fc10f65..0ecdc2c 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.util.StandaloneUtils;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -32,7 +34,7 @@ import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
 
 /**
- * Tests that verify that the LeaderRetrievalSevice correctly handles 
non-resolvable host names
+ * Tests that verify that the LeaderRetrievalService correctly handles 
non-resolvable host names
  * and does not fail with another exception
  */
 public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
@@ -48,21 +50,16 @@ public class LeaderRetrievalServiceHostnameResolutionTest 
extends TestLogger {
         * Tests that the StandaloneLeaderRetrievalService resolves host names 
if specified.
         */
        @Test
-       public void testUnresolvableHostname1() {
+       public void testUnresolvableHostname1() throws UnknownHostException, 
ConfigurationException {
+               Configuration config = new Configuration();
 
-               try {
-                       Configuration config = new Configuration();
+               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
nonExistingHostname);
+               config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
17234);
 
-                       
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
nonExistingHostname);
-                       
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
-
-                       
LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
-               }
-               catch (Exception e) {
-                       System.err.println("Shouldn't throw an exception!");
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               StandaloneUtils.createLeaderRetrievalService(
+                       config,
+                       false,
+                       JobMaster.JOB_MANAGER_NAME);
        }
 
        /*
@@ -77,7 +74,10 @@ public class LeaderRetrievalServiceHostnameResolutionTest 
extends TestLogger {
                        
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
nonExistingHostname);
                        
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-                       
LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
+                       StandaloneUtils.createLeaderRetrievalService(
+                               config,
+                               true,
+                               JobMaster.JOB_MANAGER_NAME);
                        fail("This should fail with an UnknownHostException");
                }
                catch (UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 5f5209a..063a363 100644
--- 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,14 +19,16 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
+import java.net.URL;
+import java.util.Collections;
 
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,8 +45,9 @@ public class AvroExternalJarProgramITCase extends TestLogger {
                LocalFlinkMiniCluster testMiniCluster = null;
 
                try {
+                       int parallelism = 4;
                        Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
                        testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
                        testMiniCluster.start();
 
@@ -53,15 +56,16 @@ public class AvroExternalJarProgramITCase extends 
TestLogger {
 
                        PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
 
+                       TestEnvironment.setAsContext(
+                               testMiniCluster,
+                               parallelism,
+                               Collections.singleton(new Path(jarFile)),
+                               Collections.<URL>emptyList());
 
                        
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
                        
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
testMiniCluster.getLeaderRPCPort());
 
-                       ClusterClient client = new 
StandaloneClusterClient(config);
-
-                       client.setPrintStatusDuringExecution(false);
-                       client.run(program, 4);
-
+                       program.invokeInteractiveModeForExecution();
                }
                catch (Throwable t) {
                        System.err.println(t.getMessage());
@@ -69,6 +73,8 @@ public class AvroExternalJarProgramITCase extends TestLogger {
                        Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
                }
                finally {
+                       TestEnvironment.unsetAsContext();
+
                        if (testMiniCluster != null) {
                                try {
                                        testMiniCluster.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties 
b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 2085169..39b2b8f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -180,13 +180,14 @@ public class Kafka010ITCase extends KafkaConsumerTestBase 
{
 
                // ---------- Produce an event time stream into Kafka 
-------------------
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
                DataStream<Long> streamWithTimestamps = env.addSource(new 
SourceFunction<Long>() {
+                       private static final long serialVersionUID = 
-2255105836471289626L;
                        boolean running = true;
 
                        @Override
@@ -208,6 +209,8 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
                final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), 
env.getConfig());
                FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new 
KafkaPartitioner<Long>() {
+                       private static final long serialVersionUID = 
-6730989584364230617L;
+
                        @Override
                        public int partition(Long next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
                                return (int)(next % 3);
@@ -219,7 +222,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
                // ---------- Consume stream from Kafka -------------------
 
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
@@ -227,6 +230,8 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
                FlinkKafkaConsumer010<Long> kafkaSource = new 
FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
                kafkaSource.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Long>() {
+                       private static final long serialVersionUID = 
-4834111073247835189L;
+
                        @Nullable
                        @Override
                        public Watermark checkAndGetNextWatermark(Long 
lastElement, long extractedTimestamp) {
@@ -253,8 +258,12 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
        private static class TimestampValidatingOperator extends 
StreamSink<Long> {
 
+               private static final long serialVersionUID = 
1353168781235526806L;
+
                public TimestampValidatingOperator() {
                        super(new SinkFunction<Long>() {
+                               private static final long serialVersionUID = 
-6676565693361786524L;
+
                                @Override
                                public void invoke(Long value) throws Exception 
{
                                        throw new 
RuntimeException("Unexpected");
@@ -304,6 +313,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
        private static class LimitedLongDeserializer implements 
KeyedDeserializationSchema<Long> {
 
+               private static final long serialVersionUID = 
6966177118923713521L;
                private final TypeInformation<Long> ti;
                private final TypeSerializer<Long> ser;
                long cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index f5cb8c0..8cc735d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -87,7 +87,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                final int valuesCount = 20;
                final int startFrom = 0;
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
 
                readSequence(env, StartupMode.GROUP_OFFSETS, null, 
standardProps, parallelism, topic, valuesCount, startFrom);
@@ -190,7 +190,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                // write a sequence from 0 to 99 to each of the 3 partitions.
                final String topicName = writeSequence("testOffsetAutocommit", 
100, parallelism, 1);
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                // NOTE: We are not enabling the checkpointing!
                env.getConfig().disableSysoutLogging();
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index e748537..16a13c0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -34,7 +34,7 @@ public class Kafka09SecuredRunITCase extends 
KafkaConsumerTestBase {
        protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
 
        @BeforeClass
-       public static void prepare() throws IOException, ClassNotFoundException 
{
+       public static void prepare() throws ClassNotFoundException {
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting Kafka09SecuredRunITCase ");
                
LOG.info("-------------------------------------------------------------------------");

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ddac61c..ba83460 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -154,7 +154,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                try {
                        Properties properties = new Properties();
 
-                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        see.getConfig().disableSysoutLogging();
                        see.setRestartStrategy(RestartStrategies.noRestart());
                        see.setParallelism(1);
@@ -173,22 +173,14 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        DataStream<String> stream = see.addSource(source);
                        stream.print();
                        see.execute("No broker test");
-               } catch(ProgramInvocationException pie) {
+               } catch(JobExecutionException jee) {
                        if(kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10")) {
-                               assertTrue(pie.getCause() instanceof 
JobExecutionException);
-
-                               JobExecutionException jee = 
(JobExecutionException) pie.getCause();
-
                                assertTrue(jee.getCause() instanceof 
TimeoutException);
 
                                TimeoutException te = (TimeoutException) 
jee.getCause();
 
                                assertEquals("Timeout expired while fetching 
topic metadata", te.getMessage());
                        } else {
-                               assertTrue(pie.getCause() instanceof 
JobExecutionException);
-
-                               JobExecutionException jee = 
(JobExecutionException) pie.getCause();
-
                                assertTrue(jee.getCause() instanceof 
RuntimeException);
 
                                RuntimeException re = (RuntimeException) 
jee.getCause();
@@ -208,7 +200,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, 
parallelism, 1);
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.setParallelism(parallelism);
@@ -280,6 +272,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        public void runStartFromKafkaCommitOffsets() throws Exception {
                final int parallelism = 3;
                final int recordsInEachPartition = 300;
+               final int recordsToConsume = 150;
+               final int consumePause = 50;
 
                final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
 
@@ -294,7 +288,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        attempt++;
                        LOG.info("Attempt " + attempt + " to read records and 
commit some offsets to Kafka");
 
-                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.getConfig().disableSysoutLogging();
                        
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                        env.setParallelism(parallelism);
@@ -302,13 +296,13 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                        env
                                .addSource(kafkaServer.getConsumer(topicName, 
new SimpleStringSchema(), standardProps))
-                               .map(new ThrottledMapper<String>(50))
+                               .map(new ThrottledMapper<String>(consumePause))
                                .map(new MapFunction<String, Object>() {
                                        int count = 0;
                                        @Override
                                        public Object map(String value) throws 
Exception {
                                                count++;
-                                               if (count == 150) {
+                                               if (count == recordsToConsume) {
                                                        throw new 
SuccessException();
                                                }
                                                return null;
@@ -329,7 +323,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                LOG.info("Got final committed offsets from Kafka o1={}, o2={}, 
o3={}", o1, o2, o3);
 
-               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env2.getConfig().disableSysoutLogging();
                
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env2.setParallelism(parallelism);
@@ -375,7 +369,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", 
recordsInEachPartition, parallelism, 1);
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.setParallelism(parallelism);
@@ -452,7 +446,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                env.setParallelism(parallelism);
 
@@ -510,7 +504,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
 
                // setup and run the latest-consuming job
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                env.setParallelism(parallelism);
 
@@ -541,7 +535,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                try {
                                        env.execute(consumeExtraRecordsJobName);
                                } catch (Throwable t) {
-                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
+                                       if (!(t instanceof 
JobCancellationException)) {
                                                error.set(t);
                                        }
                                }
@@ -555,7 +549,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        consumeExtraRecordsJobName);
 
                // setup the extra records writing job
-               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env2.setParallelism(parallelism);
 
                DataStream<Tuple2<Integer, Integer>> extraRecordsStream = env2
                        .addSource(new 
RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
@@ -577,7 +573,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                public void cancel() {
                                        running = false;
                                }
-                       }).setParallelism(parallelism);
+                       });
 
                kafkaServer.produceIntoKafka(extraRecordsStream, topicName, 
serSchema, readProps, null);
 
@@ -626,7 +622,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                env.setParallelism(parallelism);
 
@@ -685,7 +681,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                env.setParallelism(parallelism);
 
@@ -751,7 +747,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(additionalEmptyTopic, parallelism, 1); // 
create an empty topic which will remain empty all the time
 
                final StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.enableCheckpointing(500);
                env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
@@ -878,7 +874,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(topic, parallelism, 1);
 
                DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+                               
StreamExecutionEnvironment.getExecutionEnvironment(),
                                kafkaServer,
                                topic, parallelism, numElementsPerPartition, 
true);
 
@@ -887,7 +883,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                DeserializationSchema<Integer> schema =
                                new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0));
@@ -927,7 +923,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(topic, numPartitions, 1);
 
                DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+                               
StreamExecutionEnvironment.getExecutionEnvironment(),
                                kafkaServer,
                                topic, numPartitions, numElementsPerPartition, 
false);
 
@@ -936,7 +932,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                DeserializationSchema<Integer> schema =
                                new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0));
@@ -975,7 +971,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(topic, numPartitions, 1);
 
                DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+                               
StreamExecutionEnvironment.getExecutionEnvironment(),
                                kafkaServer,
                                topic, numPartitions, numElementsPerPartition, 
true);
 
@@ -984,7 +980,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                DeserializationSchema<Integer> schema =
                                new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
                // set the number of restarts to one. The failing mapper will 
fail once, then it's only success exceptions.
@@ -1033,7 +1029,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        @Override
                        public void run() {
                                try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                                        env.setParallelism(parallelism);
                                        env.enableCheckpointing(100);
                                        env.getConfig().disableSysoutLogging();
@@ -1107,7 +1103,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        @Override
                        public void run() {
                                try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                                        env.setParallelism(parallelism);
                                        env.enableCheckpointing(100);
                                        env.getConfig().disableSysoutLogging();
@@ -1163,7 +1159,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                DeserializationSchema<Integer> schema =
                                new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(12); // needs to be more that the mini 
cluster has slots
                env.getConfig().disableSysoutLogging();
 
@@ -1180,7 +1176,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        env.execute("test fail on deploy");
                        fail("this test should fail with an exception");
                }
-               catch (ProgramInvocationException e) {
+               catch (JobExecutionException e) {
 
                        // validate that we failed due to a 
NoResourceAvailableException
                        Throwable cause = e.getCause();
@@ -1209,7 +1205,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final int NUM_TOPICS = 5;
                final int NUM_ELEMENTS = 20;
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                
                // create topics with content
@@ -1220,6 +1216,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        // create topic
                        createTestTopic(topic, i + 1 /*partitions*/, 1);
                }
+
+               // before FLINK-6078 the RemoteExecutionEnvironment set the 
parallelism to 1 as well
+               env.setParallelism(1);
+
                // run first job, producing into all topics
                DataStream<Tuple3<Integer, Integer, String>> stream = 
env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, 
String>>() {
 
@@ -1249,7 +1249,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.execute("Write to topics");
 
                // run second job consuming from multiple topics
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
 
                stream = env.addSource(kafkaServer.getConsumer(topics, schema, 
props));
@@ -1357,7 +1357,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> 
serSchema =
                                new 
TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(100);
@@ -1457,7 +1457,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(topic, parallelism, 2);
 
                DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+                               
StreamExecutionEnvironment.getExecutionEnvironment(),
                                kafkaServer,
                                topic, parallelism, numElementsPerPartition, 
true);
 
@@ -1472,7 +1472,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                DeserializationSchema<Integer> schema =
                                new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.enableCheckpointing(500);
                env.setRestartStrategy(RestartStrategies.noRestart());
@@ -1503,7 +1503,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // ----------- Write some data into Kafka -------------------
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
@@ -1535,7 +1535,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // ----------- Read the data again -------------------
 
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
@@ -1590,7 +1590,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // ----------- Write some data into Kafka -------------------
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
@@ -1621,7 +1621,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // ----------- Read the data again -------------------
 
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
@@ -1661,7 +1661,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final String topic = writeSequence("testEndOfStream", 
ELEMENT_COUNT, 1, 1);
 
                // read using custom schema
-               final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env1.setParallelism(1);
                
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env1.getConfig().disableSysoutLogging();
@@ -1700,7 +1700,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        public void run() {
                                try {
                                        // start job writing & reading data.
-                                       final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                                       final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.getExecutionEnvironment();
                                        env1.setParallelism(1);
                                        
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                                        env1.getConfig().disableSysoutLogging();
@@ -1741,7 +1741,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                        env1.execute("Metrics test job");
                                } catch(Throwable t) {
                                        LOG.warn("Got exception during 
execution", t);
-                                       if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+                                       if(!(t instanceof 
JobCancellationException)) { // we'll cancel the job
                                                error.f0 = t;
                                        }
                                }
@@ -1994,7 +1994,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                        createTestTopic(topicName, parallelism, 
replicationFactor);
 
-                       StreamExecutionEnvironment writeEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       StreamExecutionEnvironment writeEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                        writeEnv.getConfig().disableSysoutLogging();
                        
@@ -2046,7 +2046,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                        
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
                        
-                       final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                        readEnv.getConfig().disableSysoutLogging();
                        readEnv.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index c925c8f..6f61392 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -71,7 +71,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                        TypeInformation<Tuple2<Long, String>> longStringInfo = 
TypeInfoParser.parse("Tuple2<Long, String>");
 
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setRestartStrategy(RestartStrategies.noRestart());
                        env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 954dc7d..f688660 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -59,6 +60,12 @@ import static org.junit.Assert.fail;
 public class KafkaShortRetentionTestBase implements Serializable {
        
        protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+
+       protected static final int NUM_TMS = 1;
+
+       protected static final int TM_SLOTS = 8;
+
+       protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
        
        private static KafkaTestEnvironment kafkaServer;
        private static Properties standardProps;
@@ -97,17 +104,21 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                standardProps = kafkaServer.getStandardProperties();
 
                // start also a re-usable Flink mini cluster
-               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
 
                flink = new LocalFlinkMiniCluster(flinkConfig, false);
                flink.start();
+
+               TestStreamEnvironment.setAsContext(flink, PARALLELISM);
        }
 
        @AfterClass
        public static void shutDownServices() {
+               TestStreamEnvironment.unsetAsContext();
+
                if (flink != null) {
                        flink.shutdown();
                }
@@ -135,8 +146,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                tprops.setProperty("retention.ms", "250");
                kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
 
-               final StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
flink.getLeaderRPCPort());
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
                env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index a21a239..1837af6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -40,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -65,14 +65,18 @@ public abstract class KafkaTestBase extends TestLogger {
        
        protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
 
+       protected static final int NUM_TMS = 1;
+
+       protected static final int TM_SLOTS = 8;
+
+       protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
+
        protected static String brokerConnectionStrings;
 
        protected static Properties standardProps;
        
        protected static LocalFlinkMiniCluster flink;
 
-       protected static int flinkPort;
-
        protected static FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
        protected static KafkaTestEnvironment kafkaServer;
@@ -87,7 +91,7 @@ public abstract class KafkaTestBase extends TestLogger {
        // 
------------------------------------------------------------------------
        
        @BeforeClass
-       public static void prepare() throws IOException, ClassNotFoundException 
{
+       public static void prepare() throws ClassNotFoundException {
 
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting KafkaTestBase ");
@@ -95,6 +99,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
                startClusters(false);
 
+               TestStreamEnvironment.setAsContext(flink, PARALLELISM);
        }
 
        @AfterClass
@@ -104,6 +109,8 @@ public abstract class KafkaTestBase extends TestLogger {
                LOG.info("    Shut down KafkaTestBase ");
                
LOG.info("-------------------------------------------------------------------------");
 
+               TestStreamEnvironment.unsetAsContext();
+
                shutdownClusters();
 
                
LOG.info("-------------------------------------------------------------------------");
@@ -113,8 +120,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
        protected static Configuration getFlinkConfiguration() {
                Configuration flinkConfig = new Configuration();
-               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
                flinkConfig.setString(MetricOptions.REPORTERS_LIST, 
"my_reporter");
@@ -147,14 +154,10 @@ public abstract class KafkaTestBase extends TestLogger {
                // start also a re-usable Flink mini cluster
                flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), 
false);
                flink.start();
-
-               flinkPort = flink.getLeaderRPCPort();
-
        }
 
        protected static void shutdownClusters() {
 
-               flinkPort = -1;
                if (flink != null) {
                        flink.shutdown();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
index 6bdfb48..16c226f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 2b7f357..21794f9 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -205,7 +205,7 @@ public class FlinkClient {
                final ClusterClient client;
                try {
                        client = new StandaloneClusterClient(configuration);
-               } catch (final IOException e) {
+               } catch (final Exception e) {
                        throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }
 
@@ -245,7 +245,7 @@ public class FlinkClient {
                final ClusterClient client;
                try {
                        client = new StandaloneClusterClient(configuration);
-               } catch (final IOException e) {
+               } catch (final Exception e) {
                        throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 7a25636..f9b6a21 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -22,8 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -41,8 +42,9 @@ public class CollectITCase extends TestLogger {
                try {
                        cluster.start();
 
-                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
+                       TestStreamEnvironment.setAsContext(cluster, 1);
+
+                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        
                        final long N = 10;
                        DataStream<Long> stream = env.generateSequence(1, N);
@@ -57,6 +59,7 @@ public class CollectITCase extends TestLogger {
                        assertEquals("received wrong number of elements", N + 
1, i);
                }
                finally {
+                       TestStreamEnvironment.unsetAsContext();
                        cluster.stop();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index ad434d4..111d421 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -45,27 +46,19 @@ public class DegreesWithExceptionITCase extends TestLogger {
 
        @BeforeClass
        public static void setupCluster() {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-                       cluster = new LocalFlinkMiniCluster(config, false);
-                       cluster.start();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Error starting test cluster: " + e.getMessage());
-               }
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM);
+               cluster = new LocalFlinkMiniCluster(config, false);
+               cluster.start();
+
+               TestEnvironment.setAsContext(cluster, PARALLELISM);
        }
 
        @AfterClass
        public static void tearDownCluster() {
-               try {
-                       cluster.stop();
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
-               }
+               cluster.stop();
+
+               TestEnvironment.unsetAsContext();
        }
 
        /**
@@ -74,8 +67,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
        @Test
        public void testOutDegreesInvalidEdgeSrcId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
                
@@ -98,8 +90,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
        @Test
        public void testInDegreesInvalidEdgeTrgId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -122,8 +113,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
        @Test
        public void testGetDegreesInvalidEdgeTrgId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -146,8 +136,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
        @Test
        public void testGetDegreesInvalidEdgeSrcId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -170,8 +159,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
        @Test
        public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index d090d3c..7a0a30c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -48,27 +49,19 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
 
        @BeforeClass
        public static void setupCluster() {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-                       cluster = new LocalFlinkMiniCluster(config, false);
-                       cluster.start();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Error starting test cluster: " + e.getMessage());
-               }
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM);
+               cluster = new LocalFlinkMiniCluster(config, false);
+               cluster.start();
+
+               TestEnvironment.setAsContext(cluster, PARALLELISM);
        }
 
        @AfterClass
        public static void tearDownCluster() {
-               try {
-                       cluster.stop();
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
-               }
+               cluster.stop();
+
+               TestEnvironment.unsetAsContext();
        }
 
        /**
@@ -77,8 +70,7 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnEdgesInvalidEdgeSrcId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -91,6 +83,8 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
 
                        verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                        env.execute();
+
+                       fail("Expected an exception.");
                } catch (Exception e) {
                        // We expect the job to fail with an exception
                }
@@ -102,8 +96,7 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnEdgesInvalidEdgeTrgId() throws Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -116,6 +109,8 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
 
                        verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                        env.execute();
+
+                       fail("Expected an exception.");
                } catch (Exception e) {
                        // We expect the job to fail with an exception
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index afe2e18..b337bca 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -49,27 +50,19 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
 
        @BeforeClass
        public static void setupCluster() {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-                       cluster = new LocalFlinkMiniCluster(config, false);
-                       cluster.start();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Error starting test cluster: " + e.getMessage());
-               }
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM);
+               cluster = new LocalFlinkMiniCluster(config, false);
+               cluster.start();
+
+               TestEnvironment.setAsContext(cluster, PARALLELISM);
        }
 
        @AfterClass
        public static void tearDownCluster() {
-               try {
-                       cluster.stop();
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
-               }
+               cluster.stop();
+
+               TestEnvironment.unsetAsContext();
        }
 
        /**
@@ -79,8 +72,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws 
Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -93,6 +85,8 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
 
                        verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                        env.execute();
+
+                       fail("Expected an exception.");
                } catch (Exception e) {
                        // We expect the job to fail with an exception
                }
@@ -105,8 +99,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws 
Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -119,6 +112,8 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
 
                        verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                        env.execute();
+
+                       fail("Expected an exception.");
                } catch (Exception e) {
                        // We expect the job to fail with an exception
                }
@@ -131,8 +126,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws 
Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -157,8 +151,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
        @Test
        public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws 
Exception {
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 6353d6a..3ee7a99 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -62,7 +62,7 @@ trait FlinkTestBase extends BeforeAndAfter {
       false,
       true)
 
-    val clusterEnvironment = new TestEnvironment(cl, parallelism)
+    val clusterEnvironment = new TestEnvironment(cl, parallelism, false)
     clusterEnvironment.setAsContext()
 
     cluster = Some(cl)

Reply via email to