This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aee9018a21895344936974b2ebdd6262da65ad20
Author: Kostas Kloudas <[email protected]>
AuthorDate: Sat Nov 16 21:32:08 2019 +0100

    [FLINK-XXXXX] Add standalone/yarn executors and their factories
---
 ....java => StandaloneSessionClusterExecutor.java} | 28 +++++++++++----------
 ...> StandaloneSessionClusterExecutorFactory.java} | 21 +++-------------
 ...org.apache.flink.core.execution.ExecutorFactory |  3 +--
 .../yarn/executors/YarnJobClusterExecutor.java     | 28 ++++++++++++---------
 .../executors/YarnJobClusterExecutorFactory.java   | 23 +++--------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 29 ++++++++++++----------
 .../YarnSessionClusterExecutorFactory.java         | 23 +++--------------
 ...org.apache.flink.core.execution.ExecutorFactory |  4 +--
 8 files changed, 61 insertions(+), 98 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
similarity index 74%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
copy to 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index b7eeb68..e5cc82e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.client.deployment.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
@@ -41,12 +42,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running 
cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class StandaloneSessionClusterExecutor implements Executor {
 
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
+       public static final String NAME = "standalone-session-cluster";
 
-       public SessionClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       private final StandaloneClientFactory clusterClientFactory;
+
+       public StandaloneSessionClusterExecutor() {
+               this.clusterClientFactory = new StandaloneClientFactory();
        }
 
        @Override
@@ -58,13 +62,11 @@ public class SessionClusterExecutor<ClusterID> implements 
Executor {
 
                final JobGraph jobGraph = getJobGraph(pipeline, configuration, 
classpaths, dependencies);
 
-               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
-                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
+               try (final StandaloneClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final StandaloneClusterId clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
-                       try (final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
+                       try (final RestClusterClient<StandaloneClusterId> 
clusterClient = clusterDescriptor.retrieve(clusterID)) {
                                return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
                        }
                }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
similarity index 62%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
copy to 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
index 8e5a5eb..06dd451 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -19,39 +19,24 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) 
cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-       public SessionClusterExecutorFactory() {
-               this(new DefaultClusterClientServiceLoader());
-       }
-
-       public SessionClusterExecutorFactory(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-       }
+public class StandaloneSessionClusterExecutorFactory implements 
ExecutorFactory {
 
        @Override
        public boolean isCompatibleWith(Configuration configuration) {
-               return 
configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+               return 
configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
        }
 
        @Override
        public Executor getExecutor(Configuration configuration) {
-               return new SessionClusterExecutor<>(clusterClientServiceLoader);
+               return new StandaloneSessionClusterExecutor();
        }
 }
diff --git 
a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d9b144f 100644
--- 
a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ 
b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
similarity index 75%
rename from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
rename to 
flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 902d7fd..09094e7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,21 +47,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * This executor will start a cluster specifically for the job at hand and
  * tear it down when the job is finished either successfully or due to an 
error.
  */
-public class JobClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnJobClusterExecutor implements Executor {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(JobClusterExecutor.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnJobClusterExecutor.class);
 
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
+       public static final String NAME = "yarn-job-cluster";
 
-       public JobClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       private final YarnClusterClientFactory clusterClientFactory;
+
+       public YarnJobClusterExecutor() {
+               this.clusterClientFactory = new YarnClusterClientFactory();
        }
 
        @Override
        public CompletableFuture<JobClient> execute(Pipeline pipeline, 
Configuration executionConfig) throws Exception {
-               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executionConfig);
 
-               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
+               try (final YarnClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
                        final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
 
                        final List<URL> dependencies = configAccessor.getJars();
@@ -69,7 +73,7 @@ public class JobClusterExecutor<ClusterID> implements 
Executor {
 
                        final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
 
-                       try (final ClusterClient<ClusterID> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode())) {
+                       try (final ClusterClient<ApplicationId> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode())) {
                                LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
                                return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
                        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
similarity index 60%
rename from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
rename to 
flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 5e10984..408a819 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) 
clusters.
  */
 @Internal
-public class JobClusterExecutorFactory implements ExecutorFactory {
-
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-       public JobClusterExecutorFactory() {
-               this(new DefaultClusterClientServiceLoader());
-       }
-
-       public JobClusterExecutorFactory(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-       }
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
 
        @Override
        public boolean isCompatibleWith(Configuration configuration) {
-               return 
configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB);
+               return 
configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnJobClusterExecutor.NAME);
        }
 
        @Override
        public Executor getExecutor(Configuration configuration) {
-               return new JobClusterExecutor<>(clusterClientServiceLoader);
+               return new YarnJobClusterExecutor();
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
similarity index 74%
rename from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
rename to 
flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index b7eeb68..dd15d1b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import java.net.URL;
 import java.util.List;
@@ -41,12 +43,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running 
cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnSessionClusterExecutor implements Executor {
+
+       public static final String NAME = "yarn-session-cluster";
 
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
+       private final YarnClusterClientFactory clusterClientFactory;
 
-       public SessionClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       public YarnSessionClusterExecutor() {
+               this.clusterClientFactory = new YarnClusterClientFactory();
        }
 
        @Override
@@ -58,13 +63,11 @@ public class SessionClusterExecutor<ClusterID> implements 
Executor {
 
                final JobGraph jobGraph = getJobGraph(pipeline, configuration, 
classpaths, dependencies);
 
-               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
-                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
+               try (final YarnClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ApplicationId clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
-                       try (final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
+                       try (final ClusterClient<ApplicationId> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
                                return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
                        }
                }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
similarity index 59%
rename from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
rename to 
flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index 8e5a5eb..5b6b847 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) 
cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-       private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-       public SessionClusterExecutorFactory() {
-               this(new DefaultClusterClientServiceLoader());
-       }
-
-       public SessionClusterExecutorFactory(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
-               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-       }
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
 
        @Override
        public boolean isCompatibleWith(Configuration configuration) {
-               return 
configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+               return 
configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
        }
 
        @Override
        public Executor getExecutor(Configuration configuration) {
-               return new SessionClusterExecutor<>(clusterClientServiceLoader);
+               return new YarnSessionClusterExecutor();
        }
 }
diff --git 
a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
similarity index 84%
copy from 
flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
copy to 
flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d56f8c5 100644
--- 
a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ 
b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file

Reply via email to