This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 45b36e73e3376fd5736b81313259d6b2bd97336b Author: Kostas Kloudas <[email protected]> AuthorDate: Fri Nov 15 15:36:55 2019 +0100 [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader --- .../java/org/apache/flink/client/ClientUtils.java | 37 ++++++++++------------ 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 2c80236..ac247ac 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.program.ClusterClient; @@ -44,6 +45,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.jar.JarFile; @@ -99,15 +101,20 @@ public enum ClientUtils { return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns); } - public static JobExecutionResult submitJob( - ClusterClient<?> client, - JobGraph jobGraph) throws ProgramInvocationException { - checkNotNull(client); - checkNotNull(jobGraph); + public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) { + return checkNotNull(client) + .submitJob(checkNotNull(jobGraph)) + .thenApply(JobSubmissionResult::getJobID); + } + + public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) { + return submitJobAndGetJobID(client, jobGraph) + .thenCompose(client::requestJobResult); + } + + public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException { try { - return client - .submitJob(jobGraph) - .thenApply(JobSubmissionResult::getJobID) + return submitJobAndGetJobID(client, jobGraph) .thenApply(DetachedJobExecutionResult::new) .get(); } catch (InterruptedException | ExecutionException e) { @@ -120,18 +127,11 @@ public enum ClientUtils { ClusterClient<?> client, JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - checkNotNull(client); - checkNotNull(jobGraph); checkNotNull(classLoader); JobResult jobResult; - try { - jobResult = client - .submitJob(jobGraph) - .thenApply(JobSubmissionResult::getJobID) - .thenCompose(client::requestJobResult) - .get(); + jobResult = submitJobAndGetResult(client, jobGraph).get(); } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e); @@ -151,12 +151,9 @@ public enum ClientUtils { final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - final List<URL> jobJars = executionConfigAccessor.getJars(); - final List<URL> classpaths = executionConfigAccessor.getClasspaths(); - final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); - final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { Thread.currentThread().setContextClassLoader(userCodeClassLoader);
