This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1775d37e13288526dcd09113d7e5ae5a05140425 Author: Kostas Kloudas <[email protected]> AuthorDate: Tue Nov 12 12:23:26 2019 +0100 [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors change the signature of the executors and their implementation --- .../java/org/apache/flink/client/ClientUtils.java | 30 +++---- .../client/deployment/executors/JobClientImpl.java | 82 +++++++++++++++++ .../deployment/executors/JobClusterExecutor.java | 100 +++++++++++++++++++++ .../executors/SessionClusterExecutor.java | 94 +++++++++++++++++++ .../apache/flink/client/program/ClientTest.java | 5 ++ .../org/apache/flink/core/execution/Executor.java | 4 +- .../execution/{Executor.java => JobClient.java} | 23 +++-- .../flink/api/java/ExecutionEnvironment.java | 15 +++- .../flink/api/java/ExecutorDiscoveryTest.java | 19 +++- .../environment/StreamExecutionEnvironment.java | 17 +++- .../environment/ExecutorDiscoveryTest.java | 18 +++- 11 files changed, 371 insertions(+), 36 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index ac247ac..4a95a16 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -19,22 +19,20 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.executors.JobClientImpl; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.ContextEnvironmentFactory; -import org.apache.flink.client.program.DetachedJobExecutionResult; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -101,22 +99,18 @@ public enum ClientUtils { return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns); } - public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) { + public static CompletableFuture<JobClient> submitJobAndGetJobClient(ClusterClient<?> client, JobGraph jobGraph) { return checkNotNull(client) .submitJob(checkNotNull(jobGraph)) - .thenApply(JobSubmissionResult::getJobID); - } - - public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) { - return submitJobAndGetJobID(client, jobGraph) - .thenCompose(client::requestJobResult); + .thenApply(JobSubmissionResult::getJobID) + .thenApply(jobID -> new JobClientImpl<>(client, jobID)); } public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException { try { - return submitJobAndGetJobID(client, jobGraph) - .thenApply(DetachedJobExecutionResult::new) - .get(); + return submitJobAndGetJobClient(client, jobGraph) + .thenCompose(JobClient::getJobSubmissionResult) + .get(); } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e); @@ -129,17 +123,17 @@ public enum ClientUtils { ClassLoader classLoader) throws ProgramInvocationException { checkNotNull(classLoader); - JobResult jobResult; + JobClient jobClient; try { - jobResult = submitJobAndGetResult(client, jobGraph).get(); + jobClient = submitJobAndGetJobClient(client, jobGraph).get(); } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e); } try { - return jobResult.toJobExecutionResult(classLoader); - } catch (JobExecutionException | IOException | ClassNotFoundException e) { + return jobClient.getJobExecutionResult(classLoader).get(); + } catch (Exception e) { throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java new file mode 100644 index 0000000..e042369 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.DetachedJobExecutionResult; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Javadoc. + */ +public class JobClientImpl<ClusterID> implements JobClient { + + private final ClusterClient<ClusterID> clusterClient; + + private final JobID jobID; + + public JobClientImpl( + final ClusterClient<ClusterID> clusterClient, + final JobID jobID) { + this.jobID = checkNotNull(jobID); + this.clusterClient = checkNotNull(clusterClient); + } + + public JobID getJobID() { + return jobID; + } + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID)); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) { + final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>(); + + final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID); + jobResultFuture.whenComplete(((jobResult, throwable) -> { + if (throwable != null) { + ExceptionUtils.checkInterrupted(throwable); + res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable)); + } else { + try { + res.complete(jobResult.toJobExecutionResult(userClassloader)); + } catch (JobExecutionException | IOException | ClassNotFoundException e) { + res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e)); + } + } + })); + return res; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java new file mode 100644 index 0000000..902d7fd --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link Executor} to be used when executing a job in isolation. + * This executor will start a cluster specifically for the job at hand and + * tear it down when the job is finished either successfully or due to an error. + */ +public class JobClusterExecutor<ClusterID> implements Executor { + + private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class); + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception { + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); + + final List<URL> dependencies = configAccessor.getJars(); + final List<URL> classpaths = configAccessor.getClasspaths(); + + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies); + + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); + + try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) { + LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); + return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); + } + } + } + + private JobGraph getJobGraph( + final Pipeline pipeline, + final Configuration configuration, + final List<URL> classpaths, + final List<URL> libraries) { + + checkNotNull(pipeline); + checkNotNull(configuration); + checkNotNull(classpaths); + checkNotNull(libraries); + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + final JobGraph jobGraph = FlinkPipelineTranslationUtil + .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); + + jobGraph.addJars(libraries); + jobGraph.setClasspaths(classpaths); + jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); + + return jobGraph; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java new file mode 100644 index 0000000..b7eeb68 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.net.URL; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The {@link Executor} to be used when executing a job on an already running cluster. + */ +public class SessionClusterExecutor<ClusterID> implements Executor { + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + + final List<URL> dependencies = configAccessor.getJars(); + final List<URL> classpaths = configAccessor.getClasspaths(); + + final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies); + + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); + checkState(clusterID != null); + + try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) { + return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); + } + } + } + + private JobGraph getJobGraph( + final Pipeline pipeline, + final Configuration configuration, + final List<URL> classpaths, + final List<URL> libraries) { + + checkNotNull(pipeline); + checkNotNull(configuration); + checkNotNull(classpaths); + checkNotNull(libraries); + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + final JobGraph jobGraph = FlinkPipelineTranslationUtil + .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); + + jobGraph.addJars(libraries); + jobGraph.setClasspaths(classpaths); + jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); + + return jobGraph; + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 5845080..6e53abb 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Simple and maybe stupid test to check the {@link ClusterClient} class. @@ -193,6 +194,10 @@ public class ClientTest extends TestLogger { @Test public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException { PackagedProgram packagedProgramMock = mock(PackagedProgram.class); + + when(packagedProgramMock.getUserCodeClassLoader()) + .thenReturn(packagedProgramMock.getClass().getClassLoader()); + doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java index 8515f43..5be3193 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import java.util.concurrent.CompletableFuture; + /** * The entity responsible for executing a {@link Pipeline}, i.e. a user job. */ @@ -34,5 +36,5 @@ public interface Executor { * @param configuration the {@link Configuration} with the required execution parameters * @return the {@link JobExecutionResult} corresponding to the pipeline execution. */ - JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception; + CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java similarity index 60% copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 8515f43..8440dd1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -18,21 +18,20 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; /** - * The entity responsible for executing a {@link Pipeline}, i.e. a user job. + * A client that is scoped to a specific job. */ -public interface Executor { +@PublicEvolving +public interface JobClient { + + CompletableFuture<JobExecutionResult> getJobSubmissionResult(); - /** - * Executes a {@link Pipeline} based on the provided configuration. - * - * @param pipeline the {@link Pipeline} to execute - * @param configuration the {@link Configuration} with the required execution parameters - * @return the {@link JobExecutionResult} corresponding to the pipeline execution. - */ - JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception; + CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 5b07843..69abe17 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -132,6 +133,8 @@ public class ExecutionEnvironment { private final Configuration configuration; + private ClassLoader userClassloader; + /** * Creates a new Execution Environment. */ @@ -146,6 +149,11 @@ public class ExecutionEnvironment { protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(executorConfiguration); + this.userClassloader = getClass().getClassLoader(); + } + + protected void setUserClassloader(final ClassLoader userClassloader) { + this.userClassloader = checkNotNull(userClassloader); } protected Configuration getConfiguration() { @@ -796,7 +804,12 @@ public class ExecutionEnvironment { executorServiceLoader.getExecutorFactory(configuration); final Executor executor = executorFactory.getExecutor(configuration); - lastJobExecutionResult = executor.execute(plan, configuration); + + final JobClient jobClient = executor.execute(plan, configuration).get(); + lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); + return lastJobExecutionResult; } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java index 49013b8..9acbf3d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java @@ -25,13 +25,17 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.util.OptionalFailure; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -46,6 +50,7 @@ public class ExecutorDiscoveryTest { public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception { final Configuration configuration = new Configuration(); configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID); + configuration.set(DeploymentOptions.ATTACHED, true); final JobExecutionResult result = executeTestJobBasedOnConfig(configuration); @@ -78,7 +83,19 @@ public class ExecutorDiscoveryTest { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID)); - return new JobExecutionResult(new JobID(), 12L, res); + + return CompletableFuture.completedFuture(new JobClient(){ + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res)); + } + }); }; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 3870b52..7a59d5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -154,6 +155,8 @@ public class StreamExecutionEnvironment { private final Configuration configuration; + private ClassLoader userClassloader; + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- @@ -166,9 +169,16 @@ public class StreamExecutionEnvironment { this(new DefaultExecutorServiceLoader(), executorConfiguration); } - public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { + public StreamExecutionEnvironment( + final ExecutorServiceLoader executorServiceLoader, + final Configuration executorConfiguration) { this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(executorConfiguration); + this.userClassloader = getClass().getClassLoader(); + } + + protected void setUserClassloader(final ClassLoader userClassloader) { + this.userClassloader = checkNotNull(userClassloader); } protected Configuration getConfiguration() { @@ -1552,7 +1562,10 @@ public class StreamExecutionEnvironment { executorServiceLoader.getExecutorFactory(configuration); final Executor executor = executorFactory.getExecutor(configuration); - return executor.execute(streamGraph, configuration); + final JobClient jobClient = executor.execute(streamGraph, configuration).get(); + return configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java index 9c11fdf..ce593c2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java @@ -24,15 +24,19 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.util.OptionalFailure; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -47,6 +51,7 @@ public class ExecutorDiscoveryTest { public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception { final Configuration configuration = new Configuration(); configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID); + configuration.set(DeploymentOptions.ATTACHED, true); final JobExecutionResult result = executeTestJobBasedOnConfig(configuration); @@ -79,7 +84,18 @@ public class ExecutorDiscoveryTest { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID)); - return new JobExecutionResult(new JobID(), 12L, res); + return CompletableFuture.completedFuture(new JobClient(){ + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res)); + } + }); }; } }
