This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1775d37e13288526dcd09113d7e5ae5a05140425
Author: Kostas Kloudas <[email protected]>
AuthorDate: Tue Nov 12 12:23:26 2019 +0100

    [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job 
Cluster Executors
    
    change the signature of the executors and their implementation
---
 .../java/org/apache/flink/client/ClientUtils.java  |  30 +++----
 .../client/deployment/executors/JobClientImpl.java |  82 +++++++++++++++++
 .../deployment/executors/JobClusterExecutor.java   | 100 +++++++++++++++++++++
 .../executors/SessionClusterExecutor.java          |  94 +++++++++++++++++++
 .../apache/flink/client/program/ClientTest.java    |   5 ++
 .../org/apache/flink/core/execution/Executor.java  |   4 +-
 .../execution/{Executor.java => JobClient.java}    |  23 +++--
 .../flink/api/java/ExecutionEnvironment.java       |  15 +++-
 .../flink/api/java/ExecutorDiscoveryTest.java      |  19 +++-
 .../environment/StreamExecutionEnvironment.java    |  17 +++-
 .../environment/ExecutorDiscoveryTest.java         |  18 +++-
 11 files changed, 371 insertions(+), 36 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index ac247ac..4a95a16 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,22 +19,20 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
-import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.core.execution.JobClient;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -101,22 +99,18 @@ public enum ClientUtils {
                return FlinkUserCodeClassLoaders.create(resolveOrder, urls, 
parent, alwaysParentFirstLoaderPatterns);
        }
 
-       public static CompletableFuture<JobID> 
submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+       public static CompletableFuture<JobClient> 
submitJobAndGetJobClient(ClusterClient<?> client, JobGraph jobGraph) {
                return checkNotNull(client)
                                .submitJob(checkNotNull(jobGraph))
-                               .thenApply(JobSubmissionResult::getJobID);
-       }
-
-       public static CompletableFuture<JobResult> 
submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
-               return submitJobAndGetJobID(client, jobGraph)
-                               .thenCompose(client::requestJobResult);
+                               .thenApply(JobSubmissionResult::getJobID)
+                               .thenApply(jobID -> new JobClientImpl<>(client, 
jobID));
        }
 
        public static JobExecutionResult submitJob(ClusterClient<?> client, 
JobGraph jobGraph) throws ProgramInvocationException {
                try {
-                       return submitJobAndGetJobID(client, jobGraph)
-                               .thenApply(DetachedJobExecutionResult::new)
-                               .get();
+                       return submitJobAndGetJobClient(client, jobGraph)
+                                       
.thenCompose(JobClient::getJobSubmissionResult)
+                                       .get();
                } catch (InterruptedException | ExecutionException e) {
                        ExceptionUtils.checkInterrupted(e);
                        throw new ProgramInvocationException("Could not run job 
in detached mode.", jobGraph.getJobID(), e);
@@ -129,17 +123,17 @@ public enum ClientUtils {
                        ClassLoader classLoader) throws 
ProgramInvocationException {
                checkNotNull(classLoader);
 
-               JobResult jobResult;
+               JobClient jobClient;
                try {
-                       jobResult = submitJobAndGetResult(client, 
jobGraph).get();
+                       jobClient = submitJobAndGetJobClient(client, 
jobGraph).get();
                } catch (InterruptedException | ExecutionException e) {
                        ExceptionUtils.checkInterrupted(e);
                        throw new ProgramInvocationException("Could not run 
job", jobGraph.getJobID(), e);
                }
 
                try {
-                       return jobResult.toJobExecutionResult(classLoader);
-               } catch (JobExecutionException | IOException | 
ClassNotFoundException e) {
+                       return 
jobClient.getJobExecutionResult(classLoader).get();
+               } catch (Exception e) {
                        throw new ProgramInvocationException("Job failed", 
jobGraph.getJobID(), e);
                }
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
new file mode 100644
index 0000000..e042369
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Javadoc.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+       private final ClusterClient<ClusterID> clusterClient;
+
+       private final JobID jobID;
+
+       public JobClientImpl(
+                       final ClusterClient<ClusterID> clusterClient,
+                       final JobID jobID) {
+               this.jobID = checkNotNull(jobID);
+               this.clusterClient = checkNotNull(clusterClient);
+       }
+
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       @Override
+       public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+               return CompletableFuture.completedFuture(new 
DetachedJobExecutionResult(jobID));
+       }
+
+       @Override
+       public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+               final CompletableFuture<JobExecutionResult> res = new 
CompletableFuture<>();
+
+               final CompletableFuture<JobResult> jobResultFuture = 
clusterClient.requestJobResult(jobID);
+               jobResultFuture.whenComplete(((jobResult, throwable) -> {
+                       if (throwable != null) {
+                               ExceptionUtils.checkInterrupted(throwable);
+                               res.completeExceptionally(new 
ProgramInvocationException("Could not run job", jobID, throwable));
+                       } else {
+                               try {
+                                       
res.complete(jobResult.toJobExecutionResult(userClassloader));
+                               } catch (JobExecutionException | IOException | 
ClassNotFoundException e) {
+                                       res.completeExceptionally(new 
ProgramInvocationException("Job failed", jobID, e));
+                               }
+                       }
+               }));
+               return res;
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
new file mode 100644
index 0000000..902d7fd
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an 
error.
+ */
+public class JobClusterExecutor<ClusterID> implements Executor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobClusterExecutor.class);
+
+       private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+       public JobClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
+               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(Pipeline pipeline, 
Configuration executionConfig) throws Exception {
+               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
+                       final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+                       final List<URL> dependencies = configAccessor.getJars();
+                       final List<URL> classpaths = 
configAccessor.getClasspaths();
+
+                       final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, dependencies);
+
+                       final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
+
+                       try (final ClusterClient<ClusterID> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode())) {
+                               LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
+                               return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
+                       }
+               }
+       }
+
+       private JobGraph getJobGraph(
+                       final Pipeline pipeline,
+                       final Configuration configuration,
+                       final List<URL> classpaths,
+                       final List<URL> libraries) {
+
+               checkNotNull(pipeline);
+               checkNotNull(configuration);
+               checkNotNull(classpaths);
+               checkNotNull(libraries);
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+               final JobGraph jobGraph = FlinkPipelineTranslationUtil
+                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
+
+               jobGraph.addJars(libraries);
+               jobGraph.setClasspaths(classpaths);
+               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+               return jobGraph;
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
new file mode 100644
index 0000000..b7eeb68
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running 
cluster.
+ */
+public class SessionClusterExecutor<ClusterID> implements Executor {
+
+       private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+       public SessionClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
+               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(final Pipeline pipeline, 
final Configuration configuration) throws Exception {
+               final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+
+               final List<URL> dependencies = configAccessor.getJars();
+               final List<URL> classpaths = configAccessor.getClasspaths();
+
+               final JobGraph jobGraph = getJobGraph(pipeline, configuration, 
classpaths, dependencies);
+
+               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
+                       checkState(clusterID != null);
+
+                       try (final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
+                               return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+                       }
+               }
+       }
+
+       private JobGraph getJobGraph(
+                       final Pipeline pipeline,
+                       final Configuration configuration,
+                       final List<URL> classpaths,
+                       final List<URL> libraries) {
+
+               checkNotNull(pipeline);
+               checkNotNull(configuration);
+               checkNotNull(classpaths);
+               checkNotNull(libraries);
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+               final JobGraph jobGraph = FlinkPipelineTranslationUtil
+                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
+
+               jobGraph.addJars(libraries);
+               jobGraph.setClasspaths(classpaths);
+               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+               return jobGraph;
+       }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
        @Test
        public void tryLocalExecution() throws ProgramInvocationException, 
ProgramMissingJobException {
                PackagedProgram packagedProgramMock = 
mock(PackagedProgram.class);
+
+               when(packagedProgramMock.getUserCodeClassLoader())
+                               
.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
                doAnswer(new Answer<Void>() {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 8515f43..5be3193 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -34,5 +36,5 @@ public interface Executor {
         * @param configuration the {@link Configuration} with the required 
execution parameters
         * @return the {@link JobExecutionResult} corresponding to the pipeline 
execution.
         */
-       JobExecutionResult execute(Pipeline pipeline, Configuration 
configuration) throws Exception;
+       CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration 
configuration) throws Exception;
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 60%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8515f43..8440dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,21 +18,20 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-public interface Executor {
+@PublicEvolving
+public interface JobClient {
+
+       CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
-       /**
-        * Executes a {@link Pipeline} based on the provided configuration.
-        *
-        * @param pipeline the {@link Pipeline} to execute
-        * @param configuration the {@link Configuration} with the required 
execution parameters
-        * @return the {@link JobExecutionResult} corresponding to the pipeline 
execution.
-        */
-       JobExecutionResult execute(Pipeline pipeline, Configuration 
configuration) throws Exception;
+       CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
final ClassLoader userClassloader);
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..69abe17 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
        private final Configuration configuration;
 
+       private ClassLoader userClassloader;
+
        /**
         * Creates a new Execution Environment.
         */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
        protected ExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
                this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
                this.configuration = checkNotNull(executorConfiguration);
+               this.userClassloader = getClass().getClassLoader();
+       }
+
+       protected void setUserClassloader(final ClassLoader userClassloader) {
+               this.userClassloader = checkNotNull(userClassloader);
        }
 
        protected Configuration getConfiguration() {
@@ -796,7 +804,12 @@ public class ExecutionEnvironment {
                                
executorServiceLoader.getExecutorFactory(configuration);
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
-               lastJobExecutionResult = executor.execute(plan, configuration);
+
+               final JobClient jobClient = executor.execute(plan, 
configuration).get();
+               lastJobExecutionResult = 
configuration.getBoolean(DeploymentOptions.ATTACHED)
+                               ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                               : jobClient.getJobSubmissionResult().get();
+
                return lastJobExecutionResult;
        }
 
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..9acbf3d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -25,13 +25,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -46,6 +50,7 @@ public class ExecutorDiscoveryTest {
        public void 
correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws 
Exception {
                final Configuration configuration = new Configuration();
                configuration.set(DeploymentOptions.TARGET, 
IDReportingExecutorFactory.ID);
+               configuration.set(DeploymentOptions.ATTACHED, true);
 
                final JobExecutionResult result = 
executeTestJobBasedOnConfig(configuration);
 
@@ -78,7 +83,19 @@ public class ExecutorDiscoveryTest {
                        return (pipeline, executionConfig) -> {
                                final Map<String, OptionalFailure<Object>> res 
= new HashMap<>();
                                res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(ID));
-                               return new JobExecutionResult(new JobID(), 12L, 
res);
+
+                               return CompletableFuture.completedFuture(new 
JobClient(){
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+                                               throw new 
UnsupportedOperationException();
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, 
res));
+                                       }
+                               });
                        };
                }
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..7a59d5a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
        private final Configuration configuration;
 
+       private ClassLoader userClassloader;
+
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
                this(new DefaultExecutorServiceLoader(), executorConfiguration);
        }
 
-       public StreamExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
+       public StreamExecutionEnvironment(
+                       final ExecutorServiceLoader executorServiceLoader,
+                       final Configuration executorConfiguration) {
                this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
                this.configuration = checkNotNull(executorConfiguration);
+               this.userClassloader = getClass().getClassLoader();
+       }
+
+       protected void setUserClassloader(final ClassLoader userClassloader) {
+               this.userClassloader = checkNotNull(userClassloader);
        }
 
        protected Configuration getConfiguration() {
@@ -1552,7 +1562,10 @@ public class StreamExecutionEnvironment {
                                
executorServiceLoader.getExecutorFactory(configuration);
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
-               return executor.execute(streamGraph, configuration);
+               final JobClient jobClient = executor.execute(streamGraph, 
configuration).get();
+               return configuration.getBoolean(DeploymentOptions.ATTACHED)
+                               ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                               : jobClient.getJobSubmissionResult().get();
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..ce593c2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -24,15 +24,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -47,6 +51,7 @@ public class ExecutorDiscoveryTest {
        public void 
correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws 
Exception {
                final Configuration configuration = new Configuration();
                configuration.set(DeploymentOptions.TARGET, 
IDReportingExecutorFactory.ID);
+               configuration.set(DeploymentOptions.ATTACHED, true);
 
                final JobExecutionResult result = 
executeTestJobBasedOnConfig(configuration);
 
@@ -79,7 +84,18 @@ public class ExecutorDiscoveryTest {
                        return (pipeline, executionConfig) -> {
                                final Map<String, OptionalFailure<Object>> res 
= new HashMap<>();
                                res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(ID));
-                               return new JobExecutionResult(new JobID(), 12L, 
res);
+                               return CompletableFuture.completedFuture(new 
JobClient(){
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+                                               throw new 
UnsupportedOperationException();
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, 
res));
+                                       }
+                               });
                        };
                }
        }

Reply via email to