XComp commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r487699295



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -141,6 +143,24 @@ public MemorySize getManagedMemorySize() {
                return getFlinkMemory().getManaged();
        }
 
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj instanceof TaskExecutorProcessSpec) {

Review comment:
       I do not fully understand the usage of `TaskExecutorProcessSpec` and 
`CommonProcessMemorySpec`, yet. It might not be a problem at all, but: Keep in 
mind that `instanceof` destroys the symmetry contract of `Object::equals` here. 
Consider the following example:
   ```
   CommonProcessMemorySpec commonSpec = ...; // uses more general equals 
implementation
   TaskExecutorProcessSpec taskSpec = ...; // uses custom equals implementation
   commonSpec.equals(taskSpec); // --> true
   taskSpec.equals(commonSpec); // --> false
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -347,26 +347,17 @@ static ContainerLaunchContext createTaskExecutorContext(
 
                // get and validate all relevant variables
 
-               String remoteFlinkJarPath = 
env.get(YarnConfigKeys.FLINK_DIST_JAR);
+               String remoteFlinkJarPath = configuration.getFlinkDistJar();
                require(remoteFlinkJarPath != null, "Environment variable %s 
not set", YarnConfigKeys.FLINK_DIST_JAR);

Review comment:
       We could get rid of the `require` function here by statically importing 
`org.apache.flink.util.Preconditions.checkNotNull` and applying:
   ```suggestion
                String remoteFlinkJarPath = 
checkNotNull(configuration.getFlinkDistJar(), "Environment variable %s not 
set", YarnConfigKeys.FLINK_DIST_JAR);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerDriverConfiguration.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link 
org.apache.flink.yarn.YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverConfiguration {
+       private final String webInterfaceUrl;

Review comment:
       Should we add the `@Nullable` annotation here as well?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
##########
@@ -104,5 +104,5 @@ private Configuration 
createActiveResourceManagerConfiguration(Configuration ori
                                resourceManagerMetricGroup);
        }
 
-       protected abstract ResourceManagerDriver<WorkerType> 
createResourceManagerDriver(Configuration configuration);
+       protected abstract ResourceManagerDriver<WorkerType> 
createResourceManagerDriver(Configuration configuration, String 
webInterfaceUrl, String rpcAddress);

Review comment:
       The `webInterfaceUrl` and the `rpcAddress` are Yarn-specific, aren't 
they? I'm just wondering whether we should introduce a 
`ActiveResourceManagerDriverFactory` instead to avoid extending the method 
every time we have to add new parameters for a subclass.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.

Review comment:
       ```suggestion
    * Implementation of {@link ResourceManagerDriver} for YARN deployment.
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -535,14 +526,13 @@ static void require(boolean condition, String message, 
Object... values) {
                }
        }

Review comment:
       The `require` method can be removed when applying the 
`Preconditions.checkNotNull` proposals above. All passed conditions to this 
method are not-null checks. Switching to `Preconditions.checkNotNull` will 
throw `NullPointerExceptions` instead of `RuntimeExceptions` which is more 
accurately describing the error.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##########
@@ -101,58 +116,111 @@ public void 
unregisterApplicationMaster(FinalApplicationStatus appStatus, String
                unregisterApplicationMasterConsumer.accept(appStatus, 
appMessage, appTrackingUrl);
        }
 
-       void setGetMatchingRequestsFunction(
-               Function<Tuple4<Priority, String, Resource, CallbackHandler>, 
List<? extends Collection<AMRMClient.ContainerRequest>>>
-                       getMatchingRequestsFunction) {
-               this.getMatchingRequestsFunction = 
Preconditions.checkNotNull(getMatchingRequestsFunction);
-       }
-
-       void setAddContainerRequestConsumer(
-               BiConsumer<AMRMClient.ContainerRequest, CallbackHandler> 
addContainerRequestConsumer) {
-               this.addContainerRequestConsumer = 
Preconditions.checkNotNull(addContainerRequestConsumer);
-       }
-
-       void setRemoveContainerRequestConsumer(
-               BiConsumer<AMRMClient.ContainerRequest, CallbackHandler> 
removeContainerRequestConsumer) {
-               this.removeContainerRequestConsumer = 
Preconditions.checkNotNull(removeContainerRequestConsumer);
-       }
-
-       void setReleaseAssignedContainerConsumer(
-               BiConsumer<ContainerId, CallbackHandler> 
releaseAssignedContainerConsumer) {
-               this.releaseAssignedContainerConsumer = 
Preconditions.checkNotNull(releaseAssignedContainerConsumer);
-       }
-
-       void setSetHeartbeatIntervalConsumer(
-               Consumer<Integer> setHeartbeatIntervalConsumer) {
-               this.setHeartbeatIntervalConsumer = 
setHeartbeatIntervalConsumer;
-       }
-
-       void setRegisterApplicationMasterFunction(
-               TriFunction<String, Integer, String, 
RegisterApplicationMasterResponse> registerApplicationMasterFunction) {
-               this.registerApplicationMasterFunction = 
registerApplicationMasterFunction;
-       }
-
-       void setUnregisterApplicationMasterConsumer(
-               TriConsumer<FinalApplicationStatus, String, String> 
unregisterApplicationMasterConsumer) {
-               this.unregisterApplicationMasterConsumer = 
unregisterApplicationMasterConsumer;
+       static Builder builder() {
+               return new Builder();
        }
 
        // 
------------------------------------------------------------------------
        //  Override lifecycle methods to avoid actually starting the service
        // 
------------------------------------------------------------------------
 
        @Override
-       protected void serviceInit(Configuration conf) throws Exception {
-               // noop
+       public void init(Configuration conf) {
+               clientInitRunnable.run();
        }
 
        @Override
-       protected void serviceStart() throws Exception {
-               // noop
+       public void start() {
+               clientStartRunnable.run();
        }
 
        @Override
-       protected void serviceStop() throws Exception {
-               // noop
+       public void stop() {
+               clientStopRunnable.run();
+       }
+
+       /**
+        * Builder class for {@link TestingYarnAMRMClientAsync}.
+        */
+       public static class Builder {
+               private volatile Function<Tuple4<Priority, String, Resource, 
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>>
+                       getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+               private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> addContainerRequestConsumer = (ignored1, ignored2) -> {};
+               private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> removeContainerRequestConsumer = (ignored1, ignored2) -> {};
+               private volatile BiConsumer<ContainerId, CallbackHandler> 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
+               private volatile Consumer<Integer> setHeartbeatIntervalConsumer 
= (ignored) -> {};
+               private volatile TriFunction<String, Integer, String, 
RegisterApplicationMasterResponse> registerApplicationMasterFunction =
+                       (ignored1, ignored2, ignored3) -> new 
TestingRegisterApplicationMasterResponse(Collections::emptyList);
+               private volatile TriConsumer<FinalApplicationStatus, String, 
String> unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> 
{};
+               private volatile Runnable clientInitRunnable = () -> {};
+               private volatile Runnable clientStartRunnable = () -> {};
+               private volatile Runnable clientStopRunnable = () -> {};

Review comment:
       I'm a bit confused about the `volatile` usage here: Is it really 
necessary? Based on the tests, it does not look like it.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource 
resource, List<Container>
                        numAccepted, numExcess, numPending, resource);
        }
 
-       @VisibleForTesting
-       static ResourceID getContainerResourceId(Container container) {
-               return new ResourceID(container.getId().toString(), 
container.getNodeId().toString());
+       private int getNumRequestedNotAllocatedWorkers() {
+               return 
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+       }
+
+       private int 
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+               return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new 
LinkedList<>()).size();

Review comment:
       ```suggestion
                return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, 
Collections.emptyList()).size();
   ```
   Instead of instantiating an empty list we could use 
`Collections.emptyList()`.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##########
@@ -45,25 +45,40 @@
  */
 public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl<AMRMClient.ContainerRequest> {
 
-       private volatile Function<Tuple4<Priority, String, Resource, 
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>>
-               getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
-       private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> addContainerRequestConsumer = (ignored1, ignored2) -> {};
-       private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> removeContainerRequestConsumer = (ignored1, ignored2) -> {};
-       private volatile BiConsumer<ContainerId, CallbackHandler> 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
-       private volatile Consumer<Integer> setHeartbeatIntervalConsumer = 
(ignored) -> {};
-       private volatile TriFunction<String, Integer, String, 
RegisterApplicationMasterResponse> registerApplicationMasterFunction =
-               (ignored1, ignored2, ignored3) -> 
RegisterApplicationMasterResponse.newInstance(
-                       Resource.newInstance(0, 0),
-                       Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE),
-                       Collections.emptyMap(),
-                       null,
-                       Collections.emptyList(),
-                       null,
-                       Collections.emptyList());
-       private volatile TriConsumer<FinalApplicationStatus, String, String> 
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
-
-       TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+       private volatile Function<Tuple4<Priority, String, Resource, 
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>> 
getMatchingRequestsFunction;
+       private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> addContainerRequestConsumer;
+       private volatile BiConsumer<AMRMClient.ContainerRequest, 
CallbackHandler> removeContainerRequestConsumer;
+       private volatile BiConsumer<ContainerId, CallbackHandler> 
releaseAssignedContainerConsumer;
+       private volatile Consumer<Integer> setHeartbeatIntervalConsumer;
+       private volatile TriFunction<String, Integer, String, 
RegisterApplicationMasterResponse> registerApplicationMasterFunction;
+       private volatile TriConsumer<FinalApplicationStatus, String, String> 
unregisterApplicationMasterConsumer;
+       private volatile Runnable clientInitRunnable;
+       private volatile Runnable clientStartRunnable;
+       private volatile Runnable clientStopRunnable;

Review comment:
       I'm a bit confused about the `volatile` usage here: Is it really 
necessary? Based on the tests, it does not look like it.

##########
File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
##########
@@ -134,7 +134,7 @@
                "java.io.IOException: Connection reset by peer",
 
                // filter out expected ResourceManagerException caused by 
intended shutdown request
-               YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,
+               YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,

Review comment:
       We might want to fix the typo here as well: 
`ERROR_MASSAGE_ON_SHUTDOWN_REQUEST` should become 
`ERROR_MESSAGE_ON_SHUTDOWN_REQUEST`

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -347,26 +347,17 @@ static ContainerLaunchContext createTaskExecutorContext(
 
                // get and validate all relevant variables
 
-               String remoteFlinkJarPath = 
env.get(YarnConfigKeys.FLINK_DIST_JAR);
+               String remoteFlinkJarPath = configuration.getFlinkDistJar();
                require(remoteFlinkJarPath != null, "Environment variable %s 
not set", YarnConfigKeys.FLINK_DIST_JAR);
 
-               String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-               require(appId != null, "Environment variable %s not set", 
YarnConfigKeys.ENV_APP_ID);
-
-               String clientHomeDir = 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-               require(clientHomeDir != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
-               String shipListString = 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+               String shipListString = configuration.getClientShipFiles();
                require(shipListString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);

Review comment:
       We could get rid of the require function here by statically importing 
org.apache.flink.util.Preconditions.checkNotNull and applying:
   ```suggestion
                String shipListString = 
checkNotNull(configuration.getClientShipFiles(), "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -376,7 +367,7 @@ static ContainerLaunchContext createTaskExecutorContext(
                        log.debug("TM:remote krb5 path obtained {}", 
remoteKrb5Path);
                }
 
-               String classPathString = env.get(ENV_FLINK_CLASSPATH);
+               String classPathString = configuration.getFlinkClasspath();
                require(classPathString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);

Review comment:
       We could get rid of the require function here by statically importing 
org.apache.flink.util.Preconditions.checkNotNull and applying:
   ```suggestion
                String classPathString = 
checkNotNull(configuration.getFlinkClasspath(), "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -508,52 +371,103 @@ private void 
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
                return matchingContainerRequests;
        }
 
-       @Override
-       public void onShutdownRequest() {
-               onFatalError(new 
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
-       }
+       private ContainerLaunchContext createTaskExecutorLaunchContext(
+               ResourceID containerId,
+               String host,
+               TaskExecutorProcessSpec taskExecutorProcessSpec) throws 
Exception {
 
-       @Override
-       public void onNodesUpdated(List<NodeReport> list) {
-               // We are not interested in node updates
-       }
+               // init the ContainerLaunchContext
+               final String currDir = configuration.getCurrentDir();
 
-       @Override
-       public void onError(Throwable error) {
-               onFatalError(error);
-       }
+               final ContaineredTaskManagerParameters taskManagerParameters =
+                       ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
 
-       // 
------------------------------------------------------------------------
-       //  NMClientAsync CallbackHandler methods
-       // 
------------------------------------------------------------------------
-       @Override
-       public void onContainerStarted(ContainerId containerId, Map<String, 
ByteBuffer> map) {
-               log.debug("Succeeded to call YARN Node Manager to start 
container {}.", containerId);
-       }
+               log.info("TaskExecutor {} will be started on {} with {}.",
+                       containerId.getStringWithMetadata(),
+                       host,
+                       taskExecutorProcessSpec);
 
-       @Override
-       public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-               // We are not interested in getting container status
+               final Configuration taskManagerConfig = 
BootstrapTools.cloneConfiguration(flinkConfig);
+               
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
containerId.getResourceIdString());
+               
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
 containerId.getMetadata());
+
+               final String taskManagerDynamicProperties =
+                       
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, 
taskManagerConfig);
+
+               log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+               final ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
+                       flinkConfig,
+                       yarnConfig,
+                       configuration,
+                       taskManagerParameters,
+                       taskManagerDynamicProperties,
+                       currDir,
+                       YarnTaskExecutorRunner.class,
+                       log);
+
+               taskExecutorLaunchContext.getEnvironment()
+                       .put(ENV_FLINK_NODE_ID, host);
+               return taskExecutorLaunchContext;
        }
 
-       @Override
-       public void onContainerStopped(ContainerId containerId) {
-               log.debug("Succeeded to call YARN Node Manager to stop 
container {}.", containerId);
+       @VisibleForTesting
+       Optional<Resource> getContainerResource(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+               return 
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
        }
 
-       @Override
-       public void onStartContainerError(ContainerId containerId, Throwable t) 
{
-               runAsync(() -> 
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+       private RegisterApplicationMasterResponse registerApplicationMaster() 
throws Exception {
+               final int restPort;
+               final String webInterfaceUrl = 
configuration.getWebInterfaceUrl();
+               final String rpcAddress = configuration.getRpcAddress();
+
+               if (webInterfaceUrl != null) {
+                       final int lastColon = webInterfaceUrl.lastIndexOf(':');

Review comment:
       `webInterfaceUrl` cannot be something like `http://localhost/`? Just 
asking to make sure that `lastIndexOf(':')` wouldn't the wrong index here.
   
   If not, should we consider renaming the `webInterfaceUrl` in the code?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource 
resource, List<Container>
                        numAccepted, numExcess, numPending, resource);
        }
 
-       @VisibleForTesting
-       static ResourceID getContainerResourceId(Container container) {
-               return new ResourceID(container.getId().toString(), 
container.getNodeId().toString());
+       private int getNumRequestedNotAllocatedWorkers() {
+               return 
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+       }
+
+       private int 
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+               return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new 
LinkedList<>()).size();
+       }
+
+       private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
+               log.info("Removing container request {}.", 
pendingContainerRequest);
+               
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+       }
+
+       private void returnExcessContainer(Container excessContainer) {
+               log.info("Returning excess container {}.", 
excessContainer.getId());
+               
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
        }
 
-       private void startTaskExecutorInContainer(Container container, 
WorkerResourceSpec workerResourceSpec, ResourceID resourceId) {
-               workerNodeMap.put(resourceId, new YarnWorkerNode(container, 
resourceId));
+       private void startTaskExecutorInContainer(Container container, 
TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId, 
CompletableFuture<YarnWorkerNode> requestResourceFuture) {
+               final YarnWorkerNode yarnWorkerNode = new 
YarnWorkerNode(container, resourceId);
 
                try {
                        // Context information used to start a TaskExecutor 
Java process
                        ContainerLaunchContext taskExecutorLaunchContext = 
createTaskExecutorLaunchContext(
                                resourceId,
                                container.getNodeId().getHost(),
-                               
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec));
+                               taskExecutorProcessSpec);
 
                        nodeManagerClient.startContainerAsync(container, 
taskExecutorLaunchContext);
+                       requestResourceFuture.complete(yarnWorkerNode);
                } catch (Throwable t) {
-                       
releaseFailedContainerAndRequestNewContainerIfRequired(container.getId(), t);
+                       requestResourceFuture.completeExceptionally(t);
                }
        }
 
-       private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId containerId, 
Throwable throwable) {
-               validateRunsInMainThread();
-
-               log.error("Could not start TaskManager in container {}.", 
containerId, throwable);
-
-               final ResourceID resourceId = new 
ResourceID(containerId.toString());
-               // release the failed container
-               workerNodeMap.remove(resourceId);
-               resourceManagerClient.releaseAssignedContainer(containerId);
-               notifyAllocatedWorkerStopped(resourceId);
-               // and ask for a new one
-               requestYarnContainerIfRequired();
-       }
-
-       private void returnExcessContainer(Container excessContainer) {
-               log.info("Returning excess container {}.", 
excessContainer.getId());
-               
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
-       }
-
-       private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
-               log.info("Removing container request {}.", 
pendingContainerRequest);
-               
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
-       }
-
        private Collection<AMRMClient.ContainerRequest> 
getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
-               final Collection<Resource> equivalentResources = 
workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource, 
matchingStrategy);
+               final Collection<Resource> equivalentResources = 
taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource,
 matchingStrategy);
                final List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests =

Review comment:
       Just a minor remark: `equivalentResources` is not used anywhere. We 
could even call `.stream()` and the subsequent calls directly on 
`getEquivalentCounterResource(..)`. But feel free to leave it like that if you 
think that it's easier to read.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##########
@@ -34,11 +34,25 @@
  */
 class TestingYarnNMClientAsync extends NMClientAsyncImpl {
 
-       private volatile TriConsumer<Container, ContainerLaunchContext, 
CallbackHandler> startContainerAsyncConsumer = (ignored1, ignored2, ignored3) 
-> {};
-       private volatile TriConsumer<ContainerId, NodeId, CallbackHandler> 
stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {};
+       private volatile TriConsumer<Container, ContainerLaunchContext, 
CallbackHandler> startContainerAsyncConsumer;
+       private volatile TriConsumer<ContainerId, NodeId, CallbackHandler> 
stopContainerAsyncConsumer;
+       private volatile Runnable clientInitRunnable;
+       private volatile Runnable clientStartRunnable;
+       private volatile Runnable clientStopRunnable;

Review comment:
       I'm a bit confused about the `volatile` usage here: Is it really 
necessary? Based on the tests, it does not look like it.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##########
@@ -51,30 +65,74 @@ public void stopContainerAsync(ContainerId containerId, 
NodeId nodeId) {
                this.stopContainerAsyncConsumer.accept(containerId, nodeId, 
callbackHandler);
        }
 
-       void setStartContainerAsyncConsumer(TriConsumer<Container, 
ContainerLaunchContext, CallbackHandler> startContainerAsyncConsumer) {
-               this.startContainerAsyncConsumer = 
Preconditions.checkNotNull(startContainerAsyncConsumer);
-       }
-
-       void setStopContainerAsyncConsumer(TriConsumer<ContainerId, NodeId, 
CallbackHandler> stopContainerAsyncConsumer) {
-               this.stopContainerAsyncConsumer = 
Preconditions.checkNotNull(stopContainerAsyncConsumer);
+       static Builder builder() {
+               return new Builder();
        }
 
        // 
------------------------------------------------------------------------
        //  Override lifecycle methods to avoid actually starting the service
        // 
------------------------------------------------------------------------
 
        @Override
-       protected void serviceInit(Configuration conf) throws Exception {
-               // noop
+       public void init(Configuration conf) {
+               clientInitRunnable.run();
        }
 
        @Override
-       protected void serviceStart() throws Exception {
-               // noop
+       public void start() {
+               clientStartRunnable.run();
        }
 
        @Override
-       protected void serviceStop() throws Exception {
-               // noop
+       public void stop() {
+               clientStopRunnable.run();
+       }
+
+       /**
+        * Builder class for {@link TestingYarnAMRMClientAsync}.
+        */
+       public static class Builder {
+               private volatile TriConsumer<Container, ContainerLaunchContext, 
CallbackHandler> startContainerAsyncConsumer = (ignored1, ignored2, ignored3) 
-> {};
+               private volatile TriConsumer<ContainerId, NodeId, 
CallbackHandler> stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> 
{};
+               private volatile Runnable clientInitRunnable = () -> {};
+               private volatile Runnable clientStartRunnable = () -> {};
+               private volatile Runnable clientStopRunnable = () -> {};

Review comment:
       I'm a bit confused about the `volatile` usage here: Is it really 
necessary? Based on the tests, it does not look like it.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
  */
-public class YarnResourceManager extends 
LegacyActiveResourceManager<YarnWorkerNode>
-               implements AMRMClientAsync.CallbackHandler, 
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver<YarnWorkerNode> {
 
        private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
 
-       /** YARN container map. */
-       private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
        /** Environment variable name of the hostname given by the YARN.
         * In task executor we use the hostnames given by YARN consistently 
throughout akka */
        static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
        static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
 
-       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
-       private final int yarnHeartbeatIntervalMillis;
-
        private final YarnConfiguration yarnConfig;
 
-       @Nullable
-       private final String webInterfaceUrl;
+       /** The process environment variables. */
+       private final YarnResourceManagerDriverConfiguration configuration;
 
-       /** The heartbeat interval while the resource master is waiting for 
containers. */
-       private final int containerRequestHeartbeatIntervalMillis;
+       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+       private final int yarnHeartbeatIntervalMillis;
 
        /** Client to communicate with the Resource Manager (YARN's master). */
        private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
 
+       /** The heartbeat interval while the resource master is waiting for 
containers. */
+       private final int containerRequestHeartbeatIntervalMillis;
+
        /** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
        private NMClientAsync nodeManagerClient;
 
-       private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+       /** Request resource futures, keyed by container ids. */
+       private final Map<TaskExecutorProcessSpec, 
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+       private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
 
        private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
 
-       private WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
-
-       public YarnResourceManager(
-                       RpcService rpcService,
-                       ResourceID resourceId,
-                       Configuration flinkConfig,
-                       Map<String, String> env,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       SlotManager slotManager,
-                       ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
-                       JobLeaderIdService jobLeaderIdService,
-                       ClusterInformation clusterInformation,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String webInterfaceUrl,
-                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
-               super(
-                       flinkConfig,
-                       env,
-                       rpcService,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       slotManager,
-                       clusterPartitionTrackerFactory,
-                       jobLeaderIdService,
-                       clusterInformation,
-                       fatalErrorHandler,
-                       resourceManagerMetricGroup);
+       private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+       private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+       private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+       public YarnResourceManagerDriver(
+               Configuration flinkConfig,
+               YarnResourceManagerDriverConfiguration configuration,
+               YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory,
+               YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+               super(flinkConfig, 
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
                this.yarnConfig = new YarnConfiguration();
-               this.workerNodeMap = new ConcurrentHashMap<>();
+               this.requestResourceFutures = new HashMap<>();
+               this.configuration = configuration;
+
                final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-                               YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 
1000;
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
                final long yarnExpiryIntervalMS = yarnConfig.getLong(
-                               YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-                               
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+                       YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                       YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
 
                if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
                        log.warn("The heartbeat interval of the Flink 
Application master ({}) is greater " +
                                        "than YARN's expiry interval ({}). The 
application is likely to be killed by YARN.",
-                                       yarnHeartbeatIntervalMS, 
yarnExpiryIntervalMS);
+                               yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
                }
                yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
                containerRequestHeartbeatIntervalMillis = 
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 
-               this.webInterfaceUrl = webInterfaceUrl;
-
-               this.workerSpecContainerResourceAdapter = 
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+               this.taskExecutorProcessSpecContainerResourceAdapter = 
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig, 
yarnConfig);
                this.registerApplicationMasterResponseReflector = new 
RegisterApplicationMasterResponseReflector(log);
 
                this.matchingStrategy = 
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-       }
-
-       protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
-                       YarnConfiguration yarnConfiguration,
-                       int yarnHeartbeatIntervalMillis,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
-                       yarnHeartbeatIntervalMillis,
-                       this);
-
-               resourceManagerClient.init(yarnConfiguration);
-               resourceManagerClient.start();
-
-               //TODO: change akka address to tcp host and port, the 
getAddress() interface should return a standard tcp address
-               Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
-               final int restPort;
-
-               if (webInterfaceUrl != null) {
-                       final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
-                       if (lastColon == -1) {
-                               restPort = -1;
-                       } else {
-                               restPort = 
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
-                       }
-               } else {
-                       restPort = -1;
-               }
-
-               final RegisterApplicationMasterResponse 
registerApplicationMasterResponse =
-                       
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, 
webInterfaceUrl);
-               
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-               updateMatchingStrategy(registerApplicationMasterResponse);
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
 
-               return resourceManagerClient;
+               this.yarnResourceManagerClientFactory = 
yarnResourceManagerClientFactory;
+               this.yarnNodeManagerClientFactory = 
yarnNodeManagerClientFactory;
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final List<Container> containersFromPreviousAttempts =
-                       
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
-               log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
-               for (final Container container : 
containersFromPreviousAttempts) {
-                       final ResourceID resourceID = 
getContainerResourceId(container);
-                       workerNodeMap.put(resourceID, new 
YarnWorkerNode(container, resourceID));
-               }
-       }
-
-       private void updateMatchingStrategy(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final Optional<Set<String>> schedulerResourceTypesOptional =
-                       
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
-               final WorkerSpecContainerResourceAdapter.MatchingStrategy 
strategy;
-               if (schedulerResourceTypesOptional.isPresent()) {
-                       Set<String> types = 
schedulerResourceTypesOptional.get();
-                       log.info("Register application master response contains 
scheduler resource types: {}.", types);
-                       matchingStrategy = types.contains("CPU") ?
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-               } else {
-                       log.info("Register application master response does not 
contain scheduler resource types, use '{}'.",
-                               
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
-               }
-               log.info("Container matching strategy: {}.", matchingStrategy);
-       }
-
-       protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
-               // create the client to communicate with the node managers
-               NMClientAsync nodeManagerClient = 
NMClientAsync.createNMClientAsync(this);
-               nodeManagerClient.init(yarnConfiguration);
-               nodeManagerClient.start();
-               return nodeManagerClient;
-       }
-
-       @Override
-       protected Configuration loadClientConfiguration() {
-               return 
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
-       }
+       // 
------------------------------------------------------------------------
+       //  ResourceManagerDriver
+       // 
------------------------------------------------------------------------
 
        @Override
-       protected void initialize() throws ResourceManagerException {
+       protected void initializeInternal() throws Exception {
+               final YarnContainerEventHandler yarnContainerEventHandler = new 
YarnContainerEventHandler();
                try {
-                       resourceManagerClient = 
createAndStartResourceManagerClient(
-                               yarnConfig,
+                       resourceManagerClient = 
yarnResourceManagerClientFactory.createResourceManagerClient(
                                yarnHeartbeatIntervalMillis,
-                               webInterfaceUrl);
+                               yarnContainerEventHandler);
+                       resourceManagerClient.init(yarnConfig);
+                       resourceManagerClient.start();
+
+                       final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
+                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       
updateMatchingStrategy(registerApplicationMasterResponse);
                } catch (Exception e) {
                        throw new ResourceManagerException("Could not start 
resource manager client.", e);
                }
 
-               nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
+               nodeManagerClient = 
yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
+               nodeManagerClient.init(yarnConfig);
+               nodeManagerClient.start();
        }
 
        @Override
-       public void terminate() throws Exception {
+       public CompletableFuture<Void> terminate() {
                // shut down all components
-               Exception firstException = null;
+               Exception exception = null;
 
                if (resourceManagerClient != null) {
                        try {
                                resourceManagerClient.stop();
                        } catch (Exception e) {
-                               firstException = e;
+                               exception = e;
                        }
                }
 
                if (nodeManagerClient != null) {
                        try {
                                nodeManagerClient.stop();
                        } catch (Exception e) {
-                               firstException = 
ExceptionUtils.firstOrSuppressed(e, firstException);
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
                }
 
-               ExceptionUtils.tryRethrowException(firstException);
+               return exception == null ?
+                       FutureUtils.completedVoidFuture() :
+                       FutureUtils.completedExceptionally(exception);
        }
 
        @Override
-       protected void internalDeregisterApplication(
-                       ApplicationStatus finalStatus,
-                       @Nullable String diagnostics) {
-
+       public void deregisterApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) {
                // first, de-register from YARN
-               FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+               final FinalApplicationStatus yarnStatus = 
getYarnStatus(finalStatus);
                log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
 
                final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(flinkConfig);
 
                final String appTrackingUrl = 
historyServerURL.map(URL::toString).orElse("");
 
                try {
-                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, 
appTrackingUrl);
+                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, appTrackingUrl);
                } catch (Throwable t) {
                        log.error("Could not unregister the application 
master.", t);
                }
 
-               Utils.deleteApplicationFiles(env);
+               Utils.deleteApplicationFiles(configuration.getYarnFiles());
        }
 
        @Override
-       public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-               return requestYarnContainer(workerResourceSpec);
-       }
+       public CompletableFuture<YarnWorkerNode> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+               final Optional<Resource> containerResourceOptional = 
getContainerResource(taskExecutorProcessSpec);
+               final CompletableFuture<YarnWorkerNode> requestResourceFuture = 
new CompletableFuture<>();
 
-       @VisibleForTesting
-       Optional<Resource> getContainerResource(WorkerResourceSpec 
workerResourceSpec) {
-               return 
workerSpecContainerResourceAdapter.tryComputeContainerResource(workerResourceSpec);
+               if (containerResourceOptional.isPresent()) {
+                       
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+                       // make sure we transmit the request fast and receive 
fast news of granted allocations
+                       
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+                       
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new 
LinkedList<>()).add(requestResourceFuture);
+
+                       log.info("Requesting new TaskExecutor container with 
resource {}.", taskExecutorProcessSpec);
+               } else {
+                       requestResourceFuture.completeExceptionally(
+                               new 
ResourceManagerException(String.format("Could not compute the container 
Resource from the given TaskExecutorProcessSpec %s.", 
taskExecutorProcessSpec)));
+               }
+
+               return requestResourceFuture;
        }
 
        @Override
-       public boolean stopWorker(final YarnWorkerNode workerNode) {
+       public void releaseResource(YarnWorkerNode workerNode) {
                final Container container = workerNode.getContainer();
                log.info("Stopping container {}.", 
workerNode.getResourceID().getStringWithMetadata());
                nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
                
resourceManagerClient.releaseAssignedContainer(container.getId());
-               workerNodeMap.remove(workerNode.getResourceID());
-               return true;
-       }
-
-       @Override
-       protected YarnWorkerNode workerStarted(ResourceID resourceID) {
-               return workerNodeMap.get(resourceID);
        }
 
        // 
------------------------------------------------------------------------
-       //  AMRMClientAsync CallbackHandler methods
+       //  Internal
        // 
------------------------------------------------------------------------
 
-       @Override
-       public float getProgress() {
-               // Temporarily need not record the total size of asked and 
allocated containers
-               return 1;
-       }
-
-       @Override
-       public void onContainersCompleted(final List<ContainerStatus> statuses) 
{
-               runAsync(() -> {
-                               log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
-                               for (final ContainerStatus containerStatus : 
statuses) {
-
-                                       final ResourceID resourceId = new 
ResourceID(containerStatus.getContainerId().toString());
-                                       final YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
-
-                                       
notifyAllocatedWorkerStopped(resourceId);
-
-                                       if (yarnWorkerNode != null) {
-                                               // Container completed 
unexpectedly ~> start a new one
-                                               
requestYarnContainerIfRequired();
-                                       }
-                                       // Eagerly close the connection with 
task manager.
-                                       closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
-                               }
-                       }
-               );
-       }
-
-       @Override
-       public void onContainersAllocated(List<Container> containers) {
-               runAsync(() -> {
-                       log.info("Received {} containers.", containers.size());
-
-                       for (Map.Entry<Resource, List<Container>> entry : 
groupContainerByResource(containers).entrySet()) {
-                               onContainersOfResourceAllocated(entry.getKey(), 
entry.getValue());
-                       }
-
-                       // if we are waiting for no further containers, we can 
go to the
-                       // regular heartbeat interval
-                       if (getNumRequestedNotAllocatedWorkers() <= 0) {
-                               
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
-                       }
-               });
-       }
-
-       private Map<Resource, List<Container>> 
groupContainerByResource(List<Container> containers) {
-               return 
containers.stream().collect(Collectors.groupingBy(Container::getResource));
-       }
-
        private void onContainersOfResourceAllocated(Resource resource, 
List<Container> containers) {
-               final List<WorkerResourceSpec> pendingWorkerResourceSpecs =
-                       
workerSpecContainerResourceAdapter.getWorkerSpecs(resource, 
matchingStrategy).stream()
+               final List<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecs =
+                       
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
 matchingStrategy).stream()
                                .flatMap(spec -> 
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
                                .collect(Collectors.toList());
 
-               int numPending = pendingWorkerResourceSpecs.size();
+               int numPending = pendingTaskExecutorProcessSpecs.size();
                log.info("Received {} containers with resource {}, {} pending 
container requests.",
                        containers.size(),
                        resource,
                        numPending);
 
                final Iterator<Container> containerIterator = 
containers.iterator();
-               final Iterator<WorkerResourceSpec> pendingWorkerSpecIterator = 
pendingWorkerResourceSpecs.iterator();
+               final Iterator<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecIterator = 
pendingTaskExecutorProcessSpecs.iterator();
                final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator =
-                       getPendingRequestsAndCheckConsistency(resource, 
pendingWorkerResourceSpecs.size()).iterator();
+                       getPendingRequestsAndCheckConsistency(resource, 
pendingTaskExecutorProcessSpecs.size()).iterator();
 
                int numAccepted = 0;
-               while (containerIterator.hasNext() && 
pendingWorkerSpecIterator.hasNext()) {
-                       final WorkerResourceSpec workerResourceSpec = 
pendingWorkerSpecIterator.next();
+               while (containerIterator.hasNext() && 
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+                       final TaskExecutorProcessSpec taskExecutorProcessSpec = 
pendingTaskExecutorProcessSpecIterator.next();
                        final Container container = containerIterator.next();
                        final AMRMClient.ContainerRequest pendingRequest = 
pendingRequestsIterator.next();
                        final ResourceID resourceId = 
getContainerResourceId(container);
+                       final CompletableFuture<YarnWorkerNode> 
requestResourceFuture =
+                               Preconditions.checkNotNull(
+                                       Preconditions.checkNotNull(
+                                               
requestResourceFutures.get(taskExecutorProcessSpec),
+                                               "The requestResourceFuture for 
TasExecutorProcessSpec %s should not be null.", taskExecutorProcessSpec).poll(),
+                                       "The requestResourceFuture queue for 
TasExecutorProcessSpec %s should not be empty.", taskExecutorProcessSpec);
+                       if 
(requestResourceFutures.get(taskExecutorProcessSpec).isEmpty()) {

Review comment:
       Isn't the `if` condition never `true` since it is already covered by the 
previous `checkNotNull`? I.e. a `NullPointerException` will be thrown before 
this `if` can be reached.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
  */
-public class YarnResourceManager extends 
LegacyActiveResourceManager<YarnWorkerNode>
-               implements AMRMClientAsync.CallbackHandler, 
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver<YarnWorkerNode> {
 
        private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
 
-       /** YARN container map. */
-       private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
        /** Environment variable name of the hostname given by the YARN.
         * In task executor we use the hostnames given by YARN consistently 
throughout akka */
        static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
        static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
 
-       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
-       private final int yarnHeartbeatIntervalMillis;
-
        private final YarnConfiguration yarnConfig;
 
-       @Nullable
-       private final String webInterfaceUrl;
+       /** The process environment variables. */
+       private final YarnResourceManagerDriverConfiguration configuration;
 
-       /** The heartbeat interval while the resource master is waiting for 
containers. */
-       private final int containerRequestHeartbeatIntervalMillis;
+       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+       private final int yarnHeartbeatIntervalMillis;
 
        /** Client to communicate with the Resource Manager (YARN's master). */
        private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
 
+       /** The heartbeat interval while the resource master is waiting for 
containers. */
+       private final int containerRequestHeartbeatIntervalMillis;
+
        /** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
        private NMClientAsync nodeManagerClient;
 
-       private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+       /** Request resource futures, keyed by container ids. */
+       private final Map<TaskExecutorProcessSpec, 
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+       private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
 
        private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
 
-       private WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
-
-       public YarnResourceManager(
-                       RpcService rpcService,
-                       ResourceID resourceId,
-                       Configuration flinkConfig,
-                       Map<String, String> env,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       SlotManager slotManager,
-                       ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
-                       JobLeaderIdService jobLeaderIdService,
-                       ClusterInformation clusterInformation,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String webInterfaceUrl,
-                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
-               super(
-                       flinkConfig,
-                       env,
-                       rpcService,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       slotManager,
-                       clusterPartitionTrackerFactory,
-                       jobLeaderIdService,
-                       clusterInformation,
-                       fatalErrorHandler,
-                       resourceManagerMetricGroup);
+       private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+       private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+       private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+       public YarnResourceManagerDriver(
+               Configuration flinkConfig,
+               YarnResourceManagerDriverConfiguration configuration,
+               YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory,
+               YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+               super(flinkConfig, 
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
                this.yarnConfig = new YarnConfiguration();
-               this.workerNodeMap = new ConcurrentHashMap<>();
+               this.requestResourceFutures = new HashMap<>();
+               this.configuration = configuration;
+
                final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-                               YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 
1000;
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
                final long yarnExpiryIntervalMS = yarnConfig.getLong(
-                               YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-                               
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+                       YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                       YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
 
                if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
                        log.warn("The heartbeat interval of the Flink 
Application master ({}) is greater " +
                                        "than YARN's expiry interval ({}). The 
application is likely to be killed by YARN.",
-                                       yarnHeartbeatIntervalMS, 
yarnExpiryIntervalMS);
+                               yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
                }
                yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
                containerRequestHeartbeatIntervalMillis = 
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 
-               this.webInterfaceUrl = webInterfaceUrl;
-
-               this.workerSpecContainerResourceAdapter = 
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+               this.taskExecutorProcessSpecContainerResourceAdapter = 
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig, 
yarnConfig);
                this.registerApplicationMasterResponseReflector = new 
RegisterApplicationMasterResponseReflector(log);
 
                this.matchingStrategy = 
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-       }
-
-       protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
-                       YarnConfiguration yarnConfiguration,
-                       int yarnHeartbeatIntervalMillis,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
-                       yarnHeartbeatIntervalMillis,
-                       this);
-
-               resourceManagerClient.init(yarnConfiguration);
-               resourceManagerClient.start();
-
-               //TODO: change akka address to tcp host and port, the 
getAddress() interface should return a standard tcp address
-               Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
-               final int restPort;
-
-               if (webInterfaceUrl != null) {
-                       final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
-                       if (lastColon == -1) {
-                               restPort = -1;
-                       } else {
-                               restPort = 
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
-                       }
-               } else {
-                       restPort = -1;
-               }
-
-               final RegisterApplicationMasterResponse 
registerApplicationMasterResponse =
-                       
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, 
webInterfaceUrl);
-               
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-               updateMatchingStrategy(registerApplicationMasterResponse);
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
 
-               return resourceManagerClient;
+               this.yarnResourceManagerClientFactory = 
yarnResourceManagerClientFactory;
+               this.yarnNodeManagerClientFactory = 
yarnNodeManagerClientFactory;
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final List<Container> containersFromPreviousAttempts =
-                       
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
-               log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
-               for (final Container container : 
containersFromPreviousAttempts) {
-                       final ResourceID resourceID = 
getContainerResourceId(container);
-                       workerNodeMap.put(resourceID, new 
YarnWorkerNode(container, resourceID));
-               }
-       }
-
-       private void updateMatchingStrategy(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final Optional<Set<String>> schedulerResourceTypesOptional =
-                       
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
-               final WorkerSpecContainerResourceAdapter.MatchingStrategy 
strategy;
-               if (schedulerResourceTypesOptional.isPresent()) {
-                       Set<String> types = 
schedulerResourceTypesOptional.get();
-                       log.info("Register application master response contains 
scheduler resource types: {}.", types);
-                       matchingStrategy = types.contains("CPU") ?
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-               } else {
-                       log.info("Register application master response does not 
contain scheduler resource types, use '{}'.",
-                               
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
-               }
-               log.info("Container matching strategy: {}.", matchingStrategy);
-       }
-
-       protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
-               // create the client to communicate with the node managers
-               NMClientAsync nodeManagerClient = 
NMClientAsync.createNMClientAsync(this);
-               nodeManagerClient.init(yarnConfiguration);
-               nodeManagerClient.start();
-               return nodeManagerClient;
-       }
-
-       @Override
-       protected Configuration loadClientConfiguration() {
-               return 
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
-       }
+       // 
------------------------------------------------------------------------
+       //  ResourceManagerDriver
+       // 
------------------------------------------------------------------------
 
        @Override
-       protected void initialize() throws ResourceManagerException {
+       protected void initializeInternal() throws Exception {
+               final YarnContainerEventHandler yarnContainerEventHandler = new 
YarnContainerEventHandler();
                try {
-                       resourceManagerClient = 
createAndStartResourceManagerClient(
-                               yarnConfig,
+                       resourceManagerClient = 
yarnResourceManagerClientFactory.createResourceManagerClient(
                                yarnHeartbeatIntervalMillis,
-                               webInterfaceUrl);
+                               yarnContainerEventHandler);
+                       resourceManagerClient.init(yarnConfig);
+                       resourceManagerClient.start();
+
+                       final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
+                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       
updateMatchingStrategy(registerApplicationMasterResponse);
                } catch (Exception e) {

Review comment:
       Out of curiosity: What is the reason for catching `Exceptions` here but 
not for the `NodeManagerClient`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to