This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b34802df869d147f6e82910deb346341137f227
Author: Kostas Kloudas <[email protected]>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100

    [FLINK-XXXXX] Fix job client lifecycle issue
---
 .../client/deployment/executors/JobClientImpl.java |  5 +++++
 .../StandaloneSessionClusterExecutor.java          |  5 ++---
 .../org/apache/flink/core/execution/JobClient.java |  2 +-
 .../flink/api/java/ExecutionEnvironment.java       | 11 ++++++-----
 .../flink/api/java/ExecutorDiscoveryTest.java      |  5 +++++
 .../environment/StreamExecutionEnvironment.java    |  9 +++++----
 .../environment/ExecutorDiscoveryTest.java         |  5 +++++
 .../yarn/executors/YarnJobClusterExecutor.java     |  7 +++----
 .../yarn/executors/YarnSessionClusterExecutor.java | 23 +++++++---------------
 9 files changed, 39 insertions(+), 33 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
                }));
                return res;
        }
+
+       @Override
+       public void close() throws Exception {
+               this.clusterClient.close();
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements 
Executor {
                        final StandaloneClusterId clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
-                       try (final RestClusterClient<StandaloneClusterId> 
clusterClient = clusterDescriptor.retrieve(clusterID)) {
-                               return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-                       }
+                       final RestClusterClient<StandaloneClusterId> 
clusterClient = clusterDescriptor.retrieve(clusterID);
+                       return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  * A client that is scoped to a specific job.
  */
 @PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
 
        CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
 
-               final JobClient jobClient = executor.execute(plan, 
configuration).get();
-               lastJobExecutionResult = 
configuration.getBoolean(DeploymentOptions.ATTACHED)
-                               ? 
jobClient.getJobExecutionResult(userClassloader).get()
-                               : jobClient.getJobSubmissionResult().get();
+               try (final JobClient jobClient = executor.execute(plan, 
configuration).get()) {
+                       lastJobExecutionResult = 
configuration.getBoolean(DeploymentOptions.ATTACHED)
+                                       ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                                       : 
jobClient.getJobSubmissionResult().get();
 
-               return lastJobExecutionResult;
+                       return lastJobExecutionResult;
+               }
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 9acbf3d..f5c34a5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
                                        public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
                                                return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, 
res));
                                        }
+
+                                       @Override
+                                       public void close() {
+
+                                       }
                                });
                        };
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
                                
executorServiceLoader.getExecutorFactory(configuration);
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
-               final JobClient jobClient = executor.execute(streamGraph, 
configuration).get();
-               return configuration.getBoolean(DeploymentOptions.ATTACHED)
-                               ? 
jobClient.getJobExecutionResult(userClassloader).get()
-                               : jobClient.getJobSubmissionResult().get();
+               try (final JobClient jobClient = executor.execute(streamGraph, 
configuration).get()) {
+                       return 
configuration.getBoolean(DeploymentOptions.ATTACHED)
+                                       ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                                       : 
jobClient.getJobSubmissionResult().get();
+               }
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index ce593c2..97e4517 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
                                        public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
                                                return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, 
res));
                                        }
+
+                                       @Override
+                                       public void close() {
+
+                                       }
                                });
                        };
                }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
 
                        final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
 
-                       try (final ClusterClient<ApplicationId> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode())) {
-                               LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
-                               return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
-                       }
+                       final ClusterClient<ApplicationId> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode());
+                       LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
+                       return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
                }
        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..5b72045 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -56,40 +56,31 @@ public class YarnSessionClusterExecutor implements Executor 
{
 
        @Override
        public CompletableFuture<JobClient> execute(final Pipeline pipeline, 
final Configuration configuration) throws Exception {
-               final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-
-               final List<URL> dependencies = configAccessor.getJars();
-               final List<URL> classpaths = configAccessor.getClasspaths();
-
-               final JobGraph jobGraph = getJobGraph(pipeline, configuration, 
classpaths, dependencies);
+               final JobGraph jobGraph = getJobGraph(pipeline, configuration);
 
                try (final YarnClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
                        final ApplicationId clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
-                       try (final ClusterClient<ApplicationId> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
-                               return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-                       }
+                       // TODO: 17.11.19 we cannot close the client here 
because we simply have a future of the client 
+                       final ClusterClient<ApplicationId> clusterClient = 
clusterDescriptor.retrieve(clusterID);
+                       return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
                }
        }
 
        private JobGraph getJobGraph(
                        final Pipeline pipeline,
-                       final Configuration configuration,
-                       final List<URL> classpaths,
-                       final List<URL> libraries) {
+                       final Configuration configuration) {
 
                checkNotNull(pipeline);
                checkNotNull(configuration);
-               checkNotNull(classpaths);
-               checkNotNull(libraries);
 
                final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
                final JobGraph jobGraph = FlinkPipelineTranslationUtil
                                .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
 
-               jobGraph.addJars(libraries);
-               jobGraph.setClasspaths(classpaths);
+               jobGraph.addJars(executionConfigAccessor.getJars());
+               jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
                
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
 
                return jobGraph;

Reply via email to