This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b34802df869d147f6e82910deb346341137f227 Author: Kostas Kloudas <[email protected]> AuthorDate: Sun Nov 17 16:51:17 2019 +0100 [FLINK-XXXXX] Fix job client lifecycle issue --- .../client/deployment/executors/JobClientImpl.java | 5 +++++ .../StandaloneSessionClusterExecutor.java | 5 ++--- .../org/apache/flink/core/execution/JobClient.java | 2 +- .../flink/api/java/ExecutionEnvironment.java | 11 ++++++----- .../flink/api/java/ExecutorDiscoveryTest.java | 5 +++++ .../environment/StreamExecutionEnvironment.java | 9 +++++---- .../environment/ExecutorDiscoveryTest.java | 5 +++++ .../yarn/executors/YarnJobClusterExecutor.java | 7 +++---- .../yarn/executors/YarnSessionClusterExecutor.java | 23 +++++++--------------- 9 files changed, 39 insertions(+), 33 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java index e042369..c6f48bfe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java @@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient { })); return res; } + + @Override + public void close() throws Exception { + this.clusterClient.close(); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java index e5cc82e..df4d2ba 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java @@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor { final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration); checkState(clusterID != null); - try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) { - return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); - } + final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID); + return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 8440dd1..b4ab9a9 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture; * A client that is scoped to a specific job. */ @PublicEvolving -public interface JobClient { +public interface JobClient extends AutoCloseable { CompletableFuture<JobExecutionResult> getJobSubmissionResult(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 69abe17..c600eb9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -805,12 +805,13 @@ public class ExecutionEnvironment { final Executor executor = executorFactory.getExecutor(configuration); - final JobClient jobClient = executor.execute(plan, configuration).get(); - lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) - ? jobClient.getJobExecutionResult(userClassloader).get() - : jobClient.getJobSubmissionResult().get(); + try (final JobClient jobClient = executor.execute(plan, configuration).get()) { + lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); - return lastJobExecutionResult; + return lastJobExecutionResult; + } } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java index 9acbf3d..f5c34a5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java @@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest { public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res)); } + + @Override + public void close() { + + } }); }; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 7a59d5a..fdaaae0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment { executorServiceLoader.getExecutorFactory(configuration); final Executor executor = executorFactory.getExecutor(configuration); - final JobClient jobClient = executor.execute(streamGraph, configuration).get(); - return configuration.getBoolean(DeploymentOptions.ATTACHED) - ? jobClient.getJobExecutionResult(userClassloader).get() - : jobClient.getJobSubmissionResult().get(); + try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) { + return configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); + } } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java index ce593c2..97e4517 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java @@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest { public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res)); } + + @Override + public void close() { + + } }); }; } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java index 09094e7..ead6e9a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java @@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor { final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); - try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) { - LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); - return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); - } + final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); + LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); + return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java index dd15d1b..5b72045 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java @@ -56,40 +56,31 @@ public class YarnSessionClusterExecutor implements Executor { @Override public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception { - final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - - final List<URL> dependencies = configAccessor.getJars(); - final List<URL> classpaths = configAccessor.getClasspaths(); - - final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies); + final JobGraph jobGraph = getJobGraph(pipeline, configuration); try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration); checkState(clusterID != null); - try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) { - return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); - } + // TODO: 17.11.19 we cannot close the client here because we simply have a future of the client + final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID); + return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); } } private JobGraph getJobGraph( final Pipeline pipeline, - final Configuration configuration, - final List<URL> classpaths, - final List<URL> libraries) { + final Configuration configuration) { checkNotNull(pipeline); checkNotNull(configuration); - checkNotNull(classpaths); - checkNotNull(libraries); final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); final JobGraph jobGraph = FlinkPipelineTranslationUtil .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); - jobGraph.addJars(libraries); - jobGraph.setClasspaths(classpaths); + jobGraph.addJars(executionConfigAccessor.getJars()); + jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); return jobGraph;
