This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e600dad9d2d9a7e4539a4aa69e09653a63d1f1a4
Author: Kostas Kloudas <[email protected]>
AuthorDate: Sun Nov 17 13:16:42 2019 +0100

    Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE
---
 .../java/org/apache/flink/client/ClientUtils.java  |   8 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |   8 +-
 .../flink/client/program/ContextEnvironment.java   |  98 +++--------
 .../client/program/ContextEnvironmentFactory.java  |  18 +--
 .../apache/flink/client/program/ClientTest.java    | 179 ++++++++++-----------
 .../execution/DefaultExecutorServiceLoader.java    |   1 +
 .../api/environment/StreamContextEnvironment.java  |  33 +---
 7 files changed, 139 insertions(+), 206 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 4a95a16..5824832 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -139,10 +141,10 @@ public enum ClientUtils {
        }
 
        public static JobSubmissionResult executeProgram(
+                       ExecutorServiceLoader executorServiceLoader,
                        Configuration configuration,
-                       ClusterClient<?> client,
                        PackagedProgram program) throws 
ProgramMissingJobException, ProgramInvocationException {
-
+               checkNotNull(executorServiceLoader);
                final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
 
                final ClassLoader userCodeClassLoader = 
program.getUserCodeClassLoader();
@@ -156,8 +158,8 @@ public enum ClientUtils {
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult = new AtomicReference<>();
 
                        ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
+                                       executorServiceLoader,
                                        configuration,
-                                       client,
                                        userCodeClassLoader,
                                        jobExecutionResult);
                        ContextEnvironment.setAsContext(factory);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 5ed2901..258708a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -291,7 +292,7 @@ public class CliFrontend {
                                        int userParallelism = 
executionParameters.getParallelism();
                                        LOG.debug("User parallelism is set to 
{}", userParallelism);
 
-                                       executeProgram(configuration, program, 
client);
+                                       executeProgram(configuration, program);
                                } finally {
                                        if (clusterId == null && 
!executionParameters.getDetachedMode()) {
                                                // terminate the cluster only 
if we have started it before and if it's not detached
@@ -752,11 +753,10 @@ public class CliFrontend {
 
        protected void executeProgram(
                        Configuration configuration,
-                       PackagedProgram program,
-                       ClusterClient<?> client) throws 
ProgramMissingJobException, ProgramInvocationException {
+                       PackagedProgram program) throws 
ProgramMissingJobException, ProgramInvocationException {
                logAndSysout("Starting execution of program");
 
-               JobSubmissionResult result = 
ClientUtils.executeProgram(configuration, client, program);
+               JobSubmissionResult result = ClientUtils.executeProgram(new 
DefaultExecutorServiceLoader(), configuration, program);
 
                if (result.isJobExecutionResult()) {
                        logAndSysout("Program execution finished");
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9d3927a..2cc1d69 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,75 +36,54 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-       private final ClusterClient<?> client;
+       private final ExecutorServiceLoader executorServiceLoader;
 
-       private final boolean detached;
+       private final Configuration configuration;
 
-       private final List<URL> jarFilesToAttach;
-
-       private final List<URL> classpathsToAttach;
-
-       private final ClassLoader userCodeClassLoader;
-
-       private final SavepointRestoreSettings savepointSettings;
+       private final ClassLoader userClassloader;
 
        private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
        private boolean alreadyCalled;
 
-       public ContextEnvironment(
+       ContextEnvironment(
+                       final ExecutorServiceLoader executorServiceLoader,
                        final Configuration configuration,
-                       final ClusterClient<?> remoteConnection,
                        final ClassLoader userCodeClassLoader,
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult) {
+               super(executorServiceLoader, configuration);
 
-               final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
-                               .fromConfiguration(checkNotNull(configuration));
-
-               this.jarFilesToAttach = accessor.getJars();
-               this.classpathsToAttach = accessor.getClasspaths();
-               this.savepointSettings = accessor.getSavepointRestoreSettings();
-               this.detached = accessor.getDetachedMode();
+               this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
+               this.configuration = checkNotNull(configuration);
+               this.userClassloader = checkNotNull(userCodeClassLoader);
+               this.jobExecutionResult = checkNotNull(jobExecutionResult);
+               this.alreadyCalled = false;
 
-               final int parallelism = accessor.getParallelism();
+               final int parallelism = 
configuration.get(CoreOptions.DEFAULT_PARALLELISM);
                if (parallelism > 0) {
                        setParallelism(parallelism);
                }
+               super.setUserClassloader(userCodeClassLoader);
+       }
 
-               this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
-               this.jobExecutionResult = checkNotNull(jobExecutionResult);
-               this.client = checkNotNull(remoteConnection);
+       public ExecutorServiceLoader getExecutorServiceLoader() {
+               return executorServiceLoader;
+       }
 
-               this.alreadyCalled = false;
+       public Configuration getConfiguration() {
+               return configuration;
        }
 
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
                verifyExecuteIsCalledOnceWhenInDetachedMode();
-
-               Plan plan = createProgramPlan(jobName);
-
-               JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-                               plan,
-                               client.getFlinkConfiguration(),
-                               getParallelism());
-
-               jobGraph.addJars(this.jarFilesToAttach);
-               jobGraph.setClasspaths(this.classpathsToAttach);
-
-               if (detached) {
-                       lastJobExecutionResult = ClientUtils.submitJob(client, 
jobGraph);
-               } else {
-                       lastJobExecutionResult = 
ClientUtils.submitJobAndWaitForResult(client, jobGraph, 
userCodeClassLoader).getJobExecutionResult();
-               }
-
+               lastJobExecutionResult = super.execute(jobName);
                setJobExecutionResult(lastJobExecutionResult);
-
                return lastJobExecutionResult;
        }
 
        private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-               if (alreadyCalled && detached) {
+               if (alreadyCalled && 
!configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                        throw new 
InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
                }
                alreadyCalled = true;
@@ -124,28 +98,8 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
                return "Context Environment (parallelism = " + 
(getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : 
getParallelism()) + ")";
        }
 
-       public ClusterClient<?> getClient() {
-               return this.client;
-       }
-
-       public List<URL> getJars(){
-               return jarFilesToAttach;
-       }
-
-       public List<URL> getClasspaths(){
-               return classpathsToAttach;
-       }
-
-       public ClassLoader getUserCodeClassLoader() {
-               return userCodeClassLoader;
-       }
-
-       public SavepointRestoreSettings getSavepointRestoreSettings() {
-               return savepointSettings;
-       }
-
-       public boolean isDetached() {
-               return detached;
+       public ClassLoader getUserClassloader() {
+               return userClassloader;
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index f1c9ad6..ec68a13 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,9 +37,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-       private final Configuration configuration;
+       private final ExecutorServiceLoader executorServiceLoader;
 
-       private final ClusterClient<?> client;
+       private final Configuration configuration;
 
        private final ClassLoader userCodeClassLoader;
 
@@ -47,13 +48,12 @@ public class ContextEnvironmentFactory implements 
ExecutionEnvironmentFactory {
        private boolean alreadyCalled;
 
        public ContextEnvironmentFactory(
+                       final ExecutorServiceLoader executorServiceLoader,
                        final Configuration configuration,
-                       final ClusterClient<?> client,
                        final ClassLoader userCodeClassLoader,
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult) {
-
+               this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
                this.configuration = checkNotNull(configuration);
-               this.client = checkNotNull(client);
                this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
                this.jobExecutionResult = checkNotNull(jobExecutionResult);
                this.alreadyCalled = false;
@@ -63,10 +63,10 @@ public class ContextEnvironmentFactory implements 
ExecutionEnvironmentFactory {
        public ExecutionEnvironment createExecutionEnvironment() {
                verifyCreateIsCalledOnceWhenInDetachedMode();
                return new ContextEnvironment(
-                       configuration,
-                       client,
-                       userCodeClassLoader,
-                       jobExecutionResult);
+                               executorServiceLoader,
+                               configuration,
+                               userCodeClassLoader,
+                               jobExecutionResult);
        }
 
        private void verifyCreateIsCalledOnceWhenInDetachedMode() {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..2b9c038 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -53,18 +52,12 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -107,67 +100,67 @@ public class ClientTest extends TestLogger {
                return configuration;
        }
 
-       /**
-        * Tests that invalid detached mode programs fail.
-        */
-       @Test
-       public void testDetachedMode() throws Exception{
-               final ClusterClient<?> clusterClient = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-               try {
-                       PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
-                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
-                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
-                       fail(FAIL_MESSAGE);
-               } catch (ProgramInvocationException e) {
-                       assertEquals(
-                                       
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
-                                       e.getCause().getMessage());
-               }
-
-               try {
-                       PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
-                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
-                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
-                       fail(FAIL_MESSAGE);
-               } catch (ProgramInvocationException e) {
-                       assertEquals(
-                                       
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-                                       e.getCause().getMessage());
-               }
-
-               try {
-                       PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
-                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
-                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
-                       fail(FAIL_MESSAGE);
-               } catch (ProgramInvocationException e) {
-                       assertEquals(
-                                       
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-                                       e.getCause().getMessage());
-               }
-
-               try {
-                       PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
-                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
-                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
-                       fail(FAIL_MESSAGE);
-               } catch (ProgramInvocationException e) {
-                       assertEquals(
-                                       
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-                                       e.getCause().getMessage());
-               }
-
-               try {
-                       PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
-                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
-                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
-                       fail(FAIL_MESSAGE);
-               } catch (ProgramInvocationException e) {
-                       assertEquals(
-                                       
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-                                       e.getCause().getMessage());
-               }
-       }
+//     /**
+//      * Tests that invalid detached mode programs fail.
+//      */
+//     @Test
+//     public void testDetachedMode() throws Exception{
+//             final ClusterClient<?> clusterClient = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//             try {
+//                     PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
+//                     final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+//                     ClientUtils.executeProgram(configuration, 
clusterClient, prg);
+//                     fail(FAIL_MESSAGE);
+//             } catch (ProgramInvocationException e) {
+//                     assertEquals(
+//                                     
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
+//                                     e.getCause().getMessage());
+//             }
+//
+//             try {
+//                     PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
+//                     final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+//                     ClientUtils.executeProgram(configuration, 
clusterClient, prg);
+//                     fail(FAIL_MESSAGE);
+//             } catch (ProgramInvocationException e) {
+//                     assertEquals(
+//                                     
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//                                     e.getCause().getMessage());
+//             }
+//
+//             try {
+//                     PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
+//                     final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+//                     ClientUtils.executeProgram(configuration, 
clusterClient, prg);
+//                     fail(FAIL_MESSAGE);
+//             } catch (ProgramInvocationException e) {
+//                     assertEquals(
+//                                     
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//                                     e.getCause().getMessage());
+//             }
+//
+//             try {
+//                     PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
+//                     final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+//                     ClientUtils.executeProgram(configuration, 
clusterClient, prg);
+//                     fail(FAIL_MESSAGE);
+//             } catch (ProgramInvocationException e) {
+//                     assertEquals(
+//                                     
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//                                     e.getCause().getMessage());
+//             }
+//
+//             try {
+//                     PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
+//                     final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+//                     ClientUtils.executeProgram(configuration, 
clusterClient, prg);
+//                     fail(FAIL_MESSAGE);
+//             } catch (ProgramInvocationException e) {
+//                     assertEquals(
+//                                     
DetachedJobExecutionResult.DETACHED_MESSAGE + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//                                     e.getCause().getMessage());
+//             }
+//     }
 
        /**
         * This test verifies correct job submission messaging logic and plan 
translation calls.
@@ -191,31 +184,31 @@ public class ClientTest extends TestLogger {
         * This test verifies that the local execution environment cannot be 
created when
         * the program is submitted through a client.
         */
-       @Test
-       public void tryLocalExecution() throws ProgramInvocationException, 
ProgramMissingJobException {
-               PackagedProgram packagedProgramMock = 
mock(PackagedProgram.class);
-
-               when(packagedProgramMock.getUserCodeClassLoader())
-                               
.thenReturn(packagedProgramMock.getClass().getClassLoader());
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               ExecutionEnvironment.createLocalEnvironment();
-                               return null;
-                       }
-               
}).when(packagedProgramMock).invokeInteractiveModeForExecution();
-
-               try {
-                       final ClusterClient<?> client = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-                       final Configuration configuration = 
fromPackagedProgram(packagedProgramMock, 1, true);
-                       ClientUtils.executeProgram(configuration, client, 
packagedProgramMock);
-                       fail("Creating the local execution environment should 
not be possible");
-               }
-               catch (InvalidProgramException e) {
-                       // that is what we want
-               }
-       }
+//     @Test
+//     public void tryLocalExecution() throws ProgramInvocationException, 
ProgramMissingJobException {
+//             PackagedProgram packagedProgramMock = 
mock(PackagedProgram.class);
+//
+//             when(packagedProgramMock.getUserCodeClassLoader())
+//                             
.thenReturn(packagedProgramMock.getClass().getClassLoader());
+//
+//             doAnswer(new Answer<Void>() {
+//                     @Override
+//                     public Void answer(InvocationOnMock invocation) throws 
Throwable {
+//                             ExecutionEnvironment.createLocalEnvironment();
+//                             return null;
+//                     }
+//             
}).when(packagedProgramMock).invokeInteractiveModeForExecution();
+//
+//             try {
+//                     final ClusterClient<?> client = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//                     final Configuration configuration = 
fromPackagedProgram(packagedProgramMock, 1, true);
+//                     ClientUtils.executeProgram(configuration, client, 
packagedProgramMock);
+//                     fail("Creating the local execution environment should 
not be possible");
+//             }
+//             catch (InvalidProgramException e) {
+//                     // that is what we want
+//             }
+//     }
 
        @Test
        public void testGetExecutionPlan() throws ProgramInvocationException {
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 64c0034..b627b71 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
+ * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This 
implementation uses
  * Java service discovery to find the available {@link ExecutorFactory 
executor factories}.
  */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..5ca660f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where 
the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should 
be used when
@@ -35,37 +34,21 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 
        private final ContextEnvironment ctx;
 
-       protected StreamContextEnvironment(ContextEnvironment ctx) {
-               this.ctx = ctx;
+       StreamContextEnvironment(ContextEnvironment ctx) {
+               super(ctx.getExecutorServiceLoader(), ctx.getConfiguration());
+               this.ctx = checkNotNull(ctx);
+
                if (ctx.getParallelism() > 0) {
                        setParallelism(ctx.getParallelism());
                }
+               setUserClassloader(ctx.getUserClassloader());
        }
 
        @Override
        public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
                transformations.clear();
-
-               JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-                               streamGraph,
-                               ctx.getClient().getFlinkConfiguration(),
-                               getParallelism());
-
-               jobGraph.addJars(ctx.getJars());
-               jobGraph.setClasspaths(ctx.getClasspaths());
-
-               // running from the CLI will override the savepoint restore 
settings
-               
jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-               JobExecutionResult jobExecutionResult;
-               if (ctx.isDetached()) {
-                       jobExecutionResult = 
ClientUtils.submitJob(ctx.getClient(), jobGraph);
-               } else {
-                       jobExecutionResult = 
ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, 
ctx.getUserCodeClassLoader());
-               }
-
+               JobExecutionResult jobExecutionResult = 
super.execute(streamGraph);
                ctx.setJobExecutionResult(jobExecutionResult);
-
                return jobExecutionResult;
        }
 }

Reply via email to