This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit aee9018a21895344936974b2ebdd6262da65ad20 Author: Kostas Kloudas <[email protected]> AuthorDate: Sat Nov 16 21:32:08 2019 +0100 [FLINK-XXXXX] Add standalone/yarn executors and their factories --- ....java => StandaloneSessionClusterExecutor.java} | 28 +++++++++++---------- ...> StandaloneSessionClusterExecutorFactory.java} | 21 +++------------- ...org.apache.flink.core.execution.ExecutorFactory | 3 +-- .../yarn/executors/YarnJobClusterExecutor.java | 28 ++++++++++++--------- .../executors/YarnJobClusterExecutorFactory.java | 23 +++-------------- .../yarn/executors/YarnSessionClusterExecutor.java | 29 ++++++++++++---------- .../YarnSessionClusterExecutorFactory.java | 23 +++-------------- ...org.apache.flink.core.execution.ExecutorFactory | 4 +-- 8 files changed, 61 insertions(+), 98 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java similarity index 74% copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java index b7eeb68..e5cc82e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java @@ -18,14 +18,15 @@ package org.apache.flink.client.deployment.executors; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.deployment.ClusterClientFactory; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.ClusterDescriptor; -import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.deployment.StandaloneClientFactory; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.JobClient; @@ -41,12 +42,15 @@ import static org.apache.flink.util.Preconditions.checkState; /** * The {@link Executor} to be used when executing a job on an already running cluster. */ -public class SessionClusterExecutor<ClusterID> implements Executor { +@Internal +public class StandaloneSessionClusterExecutor implements Executor { - private final ClusterClientServiceLoader clusterClientServiceLoader; + public static final String NAME = "standalone-session-cluster"; - public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + private final StandaloneClientFactory clusterClientFactory; + + public StandaloneSessionClusterExecutor() { + this.clusterClientFactory = new StandaloneClientFactory(); } @Override @@ -58,13 +62,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor { final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies); - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); - - try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { - final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); + try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration); checkState(clusterID != null); - try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) { + try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) { return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java similarity index 62% copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java index 8e5a5eb..06dd451 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java @@ -19,39 +19,24 @@ package org.apache.flink.client.deployment.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.configuration.ClusterMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster. */ @Internal -public class SessionClusterExecutorFactory implements ExecutorFactory { - - private final ClusterClientServiceLoader clusterClientServiceLoader; - - public SessionClusterExecutorFactory() { - this(new DefaultClusterClientServiceLoader()); - } - - public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); - } +public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory { @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION); + return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME); } @Override public Executor getExecutor(Configuration configuration) { - return new SessionClusterExecutor<>(clusterClientServiceLoader); + return new StandaloneSessionClusterExecutor(); } } diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory index 870c57d..d9b144f 100644 --- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory +++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -13,5 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.client.deployment.executors.JobClusterExecutorFactory -org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory \ No newline at end of file +org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory \ No newline at end of file diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java similarity index 75% rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java index 902d7fd..09094e7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java @@ -16,21 +16,23 @@ * limitations under the License. */ -package org.apache.flink.client.deployment.executors; +package org.apache.flink.yarn.executors; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.deployment.ClusterClientFactory; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.executors.JobClientImpl; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.yarn.YarnClusterClientFactory; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,21 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * This executor will start a cluster specifically for the job at hand and * tear it down when the job is finished either successfully or due to an error. */ -public class JobClusterExecutor<ClusterID> implements Executor { +@Internal +public class YarnJobClusterExecutor implements Executor { - private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class); + private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class); - private final ClusterClientServiceLoader clusterClientServiceLoader; + public static final String NAME = "yarn-job-cluster"; - public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + private final YarnClusterClientFactory clusterClientFactory; + + public YarnJobClusterExecutor() { + this.clusterClientFactory = new YarnClusterClientFactory(); } @Override public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception { - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig); - try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { + try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); final List<URL> dependencies = configAccessor.getJars(); @@ -69,7 +73,7 @@ public class JobClusterExecutor<ClusterID> implements Executor { final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); - try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) { + try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) { LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java similarity index 60% rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java index 5e10984..408a819 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java @@ -16,42 +16,27 @@ * limitations under the License. */ -package org.apache.flink.client.deployment.executors; +package org.apache.flink.yarn.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.configuration.ClusterMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters. */ @Internal -public class JobClusterExecutorFactory implements ExecutorFactory { - - private final ClusterClientServiceLoader clusterClientServiceLoader; - - public JobClusterExecutorFactory() { - this(new DefaultClusterClientServiceLoader()); - } - - public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); - } +public class YarnJobClusterExecutorFactory implements ExecutorFactory { @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB); + return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnJobClusterExecutor.NAME); } @Override public Executor getExecutor(Configuration configuration) { - return new JobClusterExecutor<>(clusterClientServiceLoader); + return new YarnJobClusterExecutor(); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java similarity index 74% rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java index b7eeb68..dd15d1b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java @@ -16,20 +16,22 @@ * limitations under the License. */ -package org.apache.flink.client.deployment.executors; +package org.apache.flink.yarn.executors; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.deployment.ClusterClientFactory; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.yarn.YarnClusterClientFactory; +import org.apache.flink.yarn.YarnClusterDescriptor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import java.net.URL; import java.util.List; @@ -41,12 +43,15 @@ import static org.apache.flink.util.Preconditions.checkState; /** * The {@link Executor} to be used when executing a job on an already running cluster. */ -public class SessionClusterExecutor<ClusterID> implements Executor { +@Internal +public class YarnSessionClusterExecutor implements Executor { + + public static final String NAME = "yarn-session-cluster"; - private final ClusterClientServiceLoader clusterClientServiceLoader; + private final YarnClusterClientFactory clusterClientFactory; - public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + public YarnSessionClusterExecutor() { + this.clusterClientFactory = new YarnClusterClientFactory(); } @Override @@ -58,13 +63,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor { final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies); - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); - - try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { - final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); + try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration); checkState(clusterID != null); - try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) { + try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) { return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java similarity index 59% rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java index 8e5a5eb..5b6b847 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java @@ -16,42 +16,27 @@ * limitations under the License. */ -package org.apache.flink.client.deployment.executors; +package org.apache.flink.yarn.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.configuration.ClusterMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster. */ @Internal -public class SessionClusterExecutorFactory implements ExecutorFactory { - - private final ClusterClientServiceLoader clusterClientServiceLoader; - - public SessionClusterExecutorFactory() { - this(new DefaultClusterClientServiceLoader()); - } - - public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) { - this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); - } +public class YarnSessionClusterExecutorFactory implements ExecutorFactory { @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION); + return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnSessionClusterExecutor.NAME); } @Override public Executor getExecutor(Configuration configuration) { - return new SessionClusterExecutor<>(clusterClientServiceLoader); + return new YarnSessionClusterExecutor(); } } diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory similarity index 84% copy from flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory copy to flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory index 870c57d..d56f8c5 100644 --- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory +++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.client.deployment.executors.JobClusterExecutorFactory -org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory \ No newline at end of file +org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory +org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory \ No newline at end of file
