XComp commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r487699295
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -141,6 +143,24 @@ public MemorySize getManagedMemorySize() {
return getFlinkMemory().getManaged();
}
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof TaskExecutorProcessSpec) {
Review comment:
I do not fully understand the usage of `TaskExecutorProcessSpec` and
`CommonProcessMemorySpec`, yet. It might not be a problem at all, but: Keep in
mind that `instanceof` destroys the symmetry contract of `Object::equals` here.
Consider the following example:
```
CommonProcessMemorySpec commonSpec = ...; // uses more general equals
implementation
TaskExecutorProcessSpec taskSpec = ...; // uses custom equals implementation
commonSpec.equals(taskSpec); // --> true
taskSpec.equals(commonSpec); // --> false
```
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -347,26 +347,17 @@ static ContainerLaunchContext createTaskExecutorContext(
// get and validate all relevant variables
- String remoteFlinkJarPath =
env.get(YarnConfigKeys.FLINK_DIST_JAR);
+ String remoteFlinkJarPath = configuration.getFlinkDistJar();
require(remoteFlinkJarPath != null, "Environment variable %s
not set", YarnConfigKeys.FLINK_DIST_JAR);
Review comment:
We could get rid of the `require` function here by statically importing
`org.apache.flink.util.Preconditions.checkNotNull` and applying:
```suggestion
String remoteFlinkJarPath =
checkNotNull(configuration.getFlinkDistJar(), "Environment variable %s not
set", YarnConfigKeys.FLINK_DIST_JAR);
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerDriverConfiguration.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link
org.apache.flink.yarn.YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverConfiguration {
+ private final String webInterfaceUrl;
Review comment:
Should we add the `@Nullable` annotation here as well?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
##########
@@ -104,5 +104,5 @@ private Configuration
createActiveResourceManagerConfiguration(Configuration ori
resourceManagerMetricGroup);
}
- protected abstract ResourceManagerDriver<WorkerType>
createResourceManagerDriver(Configuration configuration);
+ protected abstract ResourceManagerDriver<WorkerType>
createResourceManagerDriver(Configuration configuration, String
webInterfaceUrl, String rpcAddress);
Review comment:
The `webInterfaceUrl` and the `rpcAddress` are Yarn-specific, aren't
they? I'm just wondering whether we should introduce a
`ActiveResourceManagerDriverFactory` instead to avoid extending the method
every time we have to add new parameters for a subclass.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
- * The yarn implementation of the resource manager. Used when the system is
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
Review comment:
```suggestion
* Implementation of {@link ResourceManagerDriver} for YARN deployment.
```
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -535,14 +526,13 @@ static void require(boolean condition, String message,
Object... values) {
}
}
Review comment:
The `require` method can be removed when applying the
`Preconditions.checkNotNull` proposals above. All passed conditions to this
method are not-null checks. Switching to `Preconditions.checkNotNull` will
throw `NullPointerExceptions` instead of `RuntimeExceptions` which is more
accurately describing the error.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##########
@@ -101,58 +116,111 @@ public void
unregisterApplicationMaster(FinalApplicationStatus appStatus, String
unregisterApplicationMasterConsumer.accept(appStatus,
appMessage, appTrackingUrl);
}
- void setGetMatchingRequestsFunction(
- Function<Tuple4<Priority, String, Resource, CallbackHandler>,
List<? extends Collection<AMRMClient.ContainerRequest>>>
- getMatchingRequestsFunction) {
- this.getMatchingRequestsFunction =
Preconditions.checkNotNull(getMatchingRequestsFunction);
- }
-
- void setAddContainerRequestConsumer(
- BiConsumer<AMRMClient.ContainerRequest, CallbackHandler>
addContainerRequestConsumer) {
- this.addContainerRequestConsumer =
Preconditions.checkNotNull(addContainerRequestConsumer);
- }
-
- void setRemoveContainerRequestConsumer(
- BiConsumer<AMRMClient.ContainerRequest, CallbackHandler>
removeContainerRequestConsumer) {
- this.removeContainerRequestConsumer =
Preconditions.checkNotNull(removeContainerRequestConsumer);
- }
-
- void setReleaseAssignedContainerConsumer(
- BiConsumer<ContainerId, CallbackHandler>
releaseAssignedContainerConsumer) {
- this.releaseAssignedContainerConsumer =
Preconditions.checkNotNull(releaseAssignedContainerConsumer);
- }
-
- void setSetHeartbeatIntervalConsumer(
- Consumer<Integer> setHeartbeatIntervalConsumer) {
- this.setHeartbeatIntervalConsumer =
setHeartbeatIntervalConsumer;
- }
-
- void setRegisterApplicationMasterFunction(
- TriFunction<String, Integer, String,
RegisterApplicationMasterResponse> registerApplicationMasterFunction) {
- this.registerApplicationMasterFunction =
registerApplicationMasterFunction;
- }
-
- void setUnregisterApplicationMasterConsumer(
- TriConsumer<FinalApplicationStatus, String, String>
unregisterApplicationMasterConsumer) {
- this.unregisterApplicationMasterConsumer =
unregisterApplicationMasterConsumer;
+ static Builder builder() {
+ return new Builder();
}
//
------------------------------------------------------------------------
// Override lifecycle methods to avoid actually starting the service
//
------------------------------------------------------------------------
@Override
- protected void serviceInit(Configuration conf) throws Exception {
- // noop
+ public void init(Configuration conf) {
+ clientInitRunnable.run();
}
@Override
- protected void serviceStart() throws Exception {
- // noop
+ public void start() {
+ clientStartRunnable.run();
}
@Override
- protected void serviceStop() throws Exception {
- // noop
+ public void stop() {
+ clientStopRunnable.run();
+ }
+
+ /**
+ * Builder class for {@link TestingYarnAMRMClientAsync}.
+ */
+ public static class Builder {
+ private volatile Function<Tuple4<Priority, String, Resource,
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>>
+ getMatchingRequestsFunction = ignored ->
Collections.emptyList();
+ private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> addContainerRequestConsumer = (ignored1, ignored2) -> {};
+ private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> removeContainerRequestConsumer = (ignored1, ignored2) -> {};
+ private volatile BiConsumer<ContainerId, CallbackHandler>
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
+ private volatile Consumer<Integer> setHeartbeatIntervalConsumer
= (ignored) -> {};
+ private volatile TriFunction<String, Integer, String,
RegisterApplicationMasterResponse> registerApplicationMasterFunction =
+ (ignored1, ignored2, ignored3) -> new
TestingRegisterApplicationMasterResponse(Collections::emptyList);
+ private volatile TriConsumer<FinalApplicationStatus, String,
String> unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) ->
{};
+ private volatile Runnable clientInitRunnable = () -> {};
+ private volatile Runnable clientStartRunnable = () -> {};
+ private volatile Runnable clientStopRunnable = () -> {};
Review comment:
I'm a bit confused about the `volatile` usage here: Is it really
necessary? Based on the tests, it does not look like it.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource
resource, List<Container>
numAccepted, numExcess, numPending, resource);
}
- @VisibleForTesting
- static ResourceID getContainerResourceId(Container container) {
- return new ResourceID(container.getId().toString(),
container.getNodeId().toString());
+ private int getNumRequestedNotAllocatedWorkers() {
+ return
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+ }
+
+ private int
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec
taskExecutorProcessSpec) {
+ return
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new
LinkedList<>()).size();
Review comment:
```suggestion
return
requestResourceFutures.getOrDefault(taskExecutorProcessSpec,
Collections.emptyList()).size();
```
Instead of instantiating an empty list we could use
`Collections.emptyList()`.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##########
@@ -45,25 +45,40 @@
*/
public class TestingYarnAMRMClientAsync extends
AMRMClientAsyncImpl<AMRMClient.ContainerRequest> {
- private volatile Function<Tuple4<Priority, String, Resource,
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>>
- getMatchingRequestsFunction = ignored ->
Collections.emptyList();
- private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> addContainerRequestConsumer = (ignored1, ignored2) -> {};
- private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> removeContainerRequestConsumer = (ignored1, ignored2) -> {};
- private volatile BiConsumer<ContainerId, CallbackHandler>
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
- private volatile Consumer<Integer> setHeartbeatIntervalConsumer =
(ignored) -> {};
- private volatile TriFunction<String, Integer, String,
RegisterApplicationMasterResponse> registerApplicationMasterFunction =
- (ignored1, ignored2, ignored3) ->
RegisterApplicationMasterResponse.newInstance(
- Resource.newInstance(0, 0),
- Resource.newInstance(Integer.MAX_VALUE,
Integer.MAX_VALUE),
- Collections.emptyMap(),
- null,
- Collections.emptyList(),
- null,
- Collections.emptyList());
- private volatile TriConsumer<FinalApplicationStatus, String, String>
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
-
- TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+ private volatile Function<Tuple4<Priority, String, Resource,
CallbackHandler>, List<? extends Collection<AMRMClient.ContainerRequest>>>
getMatchingRequestsFunction;
+ private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> addContainerRequestConsumer;
+ private volatile BiConsumer<AMRMClient.ContainerRequest,
CallbackHandler> removeContainerRequestConsumer;
+ private volatile BiConsumer<ContainerId, CallbackHandler>
releaseAssignedContainerConsumer;
+ private volatile Consumer<Integer> setHeartbeatIntervalConsumer;
+ private volatile TriFunction<String, Integer, String,
RegisterApplicationMasterResponse> registerApplicationMasterFunction;
+ private volatile TriConsumer<FinalApplicationStatus, String, String>
unregisterApplicationMasterConsumer;
+ private volatile Runnable clientInitRunnable;
+ private volatile Runnable clientStartRunnable;
+ private volatile Runnable clientStopRunnable;
Review comment:
I'm a bit confused about the `volatile` usage here: Is it really
necessary? Based on the tests, it does not look like it.
##########
File path:
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
##########
@@ -134,7 +134,7 @@
"java.io.IOException: Connection reset by peer",
// filter out expected ResourceManagerException caused by
intended shutdown request
- YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,
+ YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,
Review comment:
We might want to fix the typo here as well:
`ERROR_MASSAGE_ON_SHUTDOWN_REQUEST` should become
`ERROR_MESSAGE_ON_SHUTDOWN_REQUEST`
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -347,26 +347,17 @@ static ContainerLaunchContext createTaskExecutorContext(
// get and validate all relevant variables
- String remoteFlinkJarPath =
env.get(YarnConfigKeys.FLINK_DIST_JAR);
+ String remoteFlinkJarPath = configuration.getFlinkDistJar();
require(remoteFlinkJarPath != null, "Environment variable %s
not set", YarnConfigKeys.FLINK_DIST_JAR);
- String appId = env.get(YarnConfigKeys.ENV_APP_ID);
- require(appId != null, "Environment variable %s not set",
YarnConfigKeys.ENV_APP_ID);
-
- String clientHomeDir =
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
- require(clientHomeDir != null, "Environment variable %s not
set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
- String shipListString =
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+ String shipListString = configuration.getClientShipFiles();
require(shipListString != null, "Environment variable %s not
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
Review comment:
We could get rid of the require function here by statically importing
org.apache.flink.util.Preconditions.checkNotNull and applying:
```suggestion
String shipListString =
checkNotNull(configuration.getClientShipFiles(), "Environment variable %s not
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
```
##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -376,7 +367,7 @@ static ContainerLaunchContext createTaskExecutorContext(
log.debug("TM:remote krb5 path obtained {}",
remoteKrb5Path);
}
- String classPathString = env.get(ENV_FLINK_CLASSPATH);
+ String classPathString = configuration.getFlinkClasspath();
require(classPathString != null, "Environment variable %s not
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
Review comment:
We could get rid of the require function here by statically importing
org.apache.flink.util.Preconditions.checkNotNull and applying:
```suggestion
String classPathString =
checkNotNull(configuration.getFlinkClasspath(), "Environment variable %s not
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -508,52 +371,103 @@ private void
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
return matchingContainerRequests;
}
- @Override
- public void onShutdownRequest() {
- onFatalError(new
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
- }
+ private ContainerLaunchContext createTaskExecutorLaunchContext(
+ ResourceID containerId,
+ String host,
+ TaskExecutorProcessSpec taskExecutorProcessSpec) throws
Exception {
- @Override
- public void onNodesUpdated(List<NodeReport> list) {
- // We are not interested in node updates
- }
+ // init the ContainerLaunchContext
+ final String currDir = configuration.getCurrentDir();
- @Override
- public void onError(Throwable error) {
- onFatalError(error);
- }
+ final ContaineredTaskManagerParameters taskManagerParameters =
+ ContaineredTaskManagerParameters.create(flinkConfig,
taskExecutorProcessSpec);
- //
------------------------------------------------------------------------
- // NMClientAsync CallbackHandler methods
- //
------------------------------------------------------------------------
- @Override
- public void onContainerStarted(ContainerId containerId, Map<String,
ByteBuffer> map) {
- log.debug("Succeeded to call YARN Node Manager to start
container {}.", containerId);
- }
+ log.info("TaskExecutor {} will be started on {} with {}.",
+ containerId.getStringWithMetadata(),
+ host,
+ taskExecutorProcessSpec);
- @Override
- public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
- // We are not interested in getting container status
+ final Configuration taskManagerConfig =
BootstrapTools.cloneConfiguration(flinkConfig);
+
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID,
containerId.getResourceIdString());
+
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
containerId.getMetadata());
+
+ final String taskManagerDynamicProperties =
+
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig,
taskManagerConfig);
+
+ log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+ final ContainerLaunchContext taskExecutorLaunchContext =
Utils.createTaskExecutorContext(
+ flinkConfig,
+ yarnConfig,
+ configuration,
+ taskManagerParameters,
+ taskManagerDynamicProperties,
+ currDir,
+ YarnTaskExecutorRunner.class,
+ log);
+
+ taskExecutorLaunchContext.getEnvironment()
+ .put(ENV_FLINK_NODE_ID, host);
+ return taskExecutorLaunchContext;
}
- @Override
- public void onContainerStopped(ContainerId containerId) {
- log.debug("Succeeded to call YARN Node Manager to stop
container {}.", containerId);
+ @VisibleForTesting
+ Optional<Resource> getContainerResource(TaskExecutorProcessSpec
taskExecutorProcessSpec) {
+ return
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
}
- @Override
- public void onStartContainerError(ContainerId containerId, Throwable t)
{
- runAsync(() ->
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+ private RegisterApplicationMasterResponse registerApplicationMaster()
throws Exception {
+ final int restPort;
+ final String webInterfaceUrl =
configuration.getWebInterfaceUrl();
+ final String rpcAddress = configuration.getRpcAddress();
+
+ if (webInterfaceUrl != null) {
+ final int lastColon = webInterfaceUrl.lastIndexOf(':');
Review comment:
`webInterfaceUrl` cannot be something like `http://localhost/`? Just
asking to make sure that `lastIndexOf(':')` wouldn't the wrong index here.
If not, should we consider renaming the `webInterfaceUrl` in the code?
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource
resource, List<Container>
numAccepted, numExcess, numPending, resource);
}
- @VisibleForTesting
- static ResourceID getContainerResourceId(Container container) {
- return new ResourceID(container.getId().toString(),
container.getNodeId().toString());
+ private int getNumRequestedNotAllocatedWorkers() {
+ return
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+ }
+
+ private int
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec
taskExecutorProcessSpec) {
+ return
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new
LinkedList<>()).size();
+ }
+
+ private void removeContainerRequest(AMRMClient.ContainerRequest
pendingContainerRequest) {
+ log.info("Removing container request {}.",
pendingContainerRequest);
+
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+ }
+
+ private void returnExcessContainer(Container excessContainer) {
+ log.info("Returning excess container {}.",
excessContainer.getId());
+
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
- private void startTaskExecutorInContainer(Container container,
WorkerResourceSpec workerResourceSpec, ResourceID resourceId) {
- workerNodeMap.put(resourceId, new YarnWorkerNode(container,
resourceId));
+ private void startTaskExecutorInContainer(Container container,
TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId,
CompletableFuture<YarnWorkerNode> requestResourceFuture) {
+ final YarnWorkerNode yarnWorkerNode = new
YarnWorkerNode(container, resourceId);
try {
// Context information used to start a TaskExecutor
Java process
ContainerLaunchContext taskExecutorLaunchContext =
createTaskExecutorLaunchContext(
resourceId,
container.getNodeId().getHost(),
-
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,
workerResourceSpec));
+ taskExecutorProcessSpec);
nodeManagerClient.startContainerAsync(container,
taskExecutorLaunchContext);
+ requestResourceFuture.complete(yarnWorkerNode);
} catch (Throwable t) {
-
releaseFailedContainerAndRequestNewContainerIfRequired(container.getId(), t);
+ requestResourceFuture.completeExceptionally(t);
}
}
- private void
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId containerId,
Throwable throwable) {
- validateRunsInMainThread();
-
- log.error("Could not start TaskManager in container {}.",
containerId, throwable);
-
- final ResourceID resourceId = new
ResourceID(containerId.toString());
- // release the failed container
- workerNodeMap.remove(resourceId);
- resourceManagerClient.releaseAssignedContainer(containerId);
- notifyAllocatedWorkerStopped(resourceId);
- // and ask for a new one
- requestYarnContainerIfRequired();
- }
-
- private void returnExcessContainer(Container excessContainer) {
- log.info("Returning excess container {}.",
excessContainer.getId());
-
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
- }
-
- private void removeContainerRequest(AMRMClient.ContainerRequest
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
- log.info("Removing container request {}.",
pendingContainerRequest);
-
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
- }
-
private Collection<AMRMClient.ContainerRequest>
getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
- final Collection<Resource> equivalentResources =
workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource,
matchingStrategy);
+ final Collection<Resource> equivalentResources =
taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource,
matchingStrategy);
final List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequests =
Review comment:
Just a minor remark: `equivalentResources` is not used anywhere. We
could even call `.stream()` and the subsequent calls directly on
`getEquivalentCounterResource(..)`. But feel free to leave it like that if you
think that it's easier to read.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##########
@@ -34,11 +34,25 @@
*/
class TestingYarnNMClientAsync extends NMClientAsyncImpl {
- private volatile TriConsumer<Container, ContainerLaunchContext,
CallbackHandler> startContainerAsyncConsumer = (ignored1, ignored2, ignored3)
-> {};
- private volatile TriConsumer<ContainerId, NodeId, CallbackHandler>
stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {};
+ private volatile TriConsumer<Container, ContainerLaunchContext,
CallbackHandler> startContainerAsyncConsumer;
+ private volatile TriConsumer<ContainerId, NodeId, CallbackHandler>
stopContainerAsyncConsumer;
+ private volatile Runnable clientInitRunnable;
+ private volatile Runnable clientStartRunnable;
+ private volatile Runnable clientStopRunnable;
Review comment:
I'm a bit confused about the `volatile` usage here: Is it really
necessary? Based on the tests, it does not look like it.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##########
@@ -51,30 +65,74 @@ public void stopContainerAsync(ContainerId containerId,
NodeId nodeId) {
this.stopContainerAsyncConsumer.accept(containerId, nodeId,
callbackHandler);
}
- void setStartContainerAsyncConsumer(TriConsumer<Container,
ContainerLaunchContext, CallbackHandler> startContainerAsyncConsumer) {
- this.startContainerAsyncConsumer =
Preconditions.checkNotNull(startContainerAsyncConsumer);
- }
-
- void setStopContainerAsyncConsumer(TriConsumer<ContainerId, NodeId,
CallbackHandler> stopContainerAsyncConsumer) {
- this.stopContainerAsyncConsumer =
Preconditions.checkNotNull(stopContainerAsyncConsumer);
+ static Builder builder() {
+ return new Builder();
}
//
------------------------------------------------------------------------
// Override lifecycle methods to avoid actually starting the service
//
------------------------------------------------------------------------
@Override
- protected void serviceInit(Configuration conf) throws Exception {
- // noop
+ public void init(Configuration conf) {
+ clientInitRunnable.run();
}
@Override
- protected void serviceStart() throws Exception {
- // noop
+ public void start() {
+ clientStartRunnable.run();
}
@Override
- protected void serviceStop() throws Exception {
- // noop
+ public void stop() {
+ clientStopRunnable.run();
+ }
+
+ /**
+ * Builder class for {@link TestingYarnAMRMClientAsync}.
+ */
+ public static class Builder {
+ private volatile TriConsumer<Container, ContainerLaunchContext,
CallbackHandler> startContainerAsyncConsumer = (ignored1, ignored2, ignored3)
-> {};
+ private volatile TriConsumer<ContainerId, NodeId,
CallbackHandler> stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) ->
{};
+ private volatile Runnable clientInitRunnable = () -> {};
+ private volatile Runnable clientStartRunnable = () -> {};
+ private volatile Runnable clientStopRunnable = () -> {};
Review comment:
I'm a bit confused about the `volatile` usage here: Is it really
necessary? Based on the tests, it does not look like it.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
- * The yarn implementation of the resource manager. Used when the system is
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
*/
-public class YarnResourceManager extends
LegacyActiveResourceManager<YarnWorkerNode>
- implements AMRMClientAsync.CallbackHandler,
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends
AbstractResourceManagerDriver<YarnWorkerNode> {
private static final Priority RM_REQUEST_PRIORITY =
Priority.newInstance(1);
- /** YARN container map. */
- private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
/** Environment variable name of the hostname given by the YARN.
* In task executor we use the hostnames given by YARN consistently
throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received
shutdown request from YARN ResourceManager.";
- /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
- private final int yarnHeartbeatIntervalMillis;
-
private final YarnConfiguration yarnConfig;
- @Nullable
- private final String webInterfaceUrl;
+ /** The process environment variables. */
+ private final YarnResourceManagerDriverConfiguration configuration;
- /** The heartbeat interval while the resource master is waiting for
containers. */
- private final int containerRequestHeartbeatIntervalMillis;
+ /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
+ private final int yarnHeartbeatIntervalMillis;
/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient;
+ /** The heartbeat interval while the resource master is waiting for
containers. */
+ private final int containerRequestHeartbeatIntervalMillis;
+
/** Client to communicate with the Node manager and launch TaskExecutor
processes. */
private NMClientAsync nodeManagerClient;
- private final WorkerSpecContainerResourceAdapter
workerSpecContainerResourceAdapter;
+ /** Request resource futures, keyed by container ids. */
+ private final Map<TaskExecutorProcessSpec,
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+ private final TaskExecutorProcessSpecContainerResourceAdapter
taskExecutorProcessSpecContainerResourceAdapter;
private final RegisterApplicationMasterResponseReflector
registerApplicationMasterResponseReflector;
- private WorkerSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
-
- public YarnResourceManager(
- RpcService rpcService,
- ResourceID resourceId,
- Configuration flinkConfig,
- Map<String, String> env,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- SlotManager slotManager,
- ResourceManagerPartitionTrackerFactory
clusterPartitionTrackerFactory,
- JobLeaderIdService jobLeaderIdService,
- ClusterInformation clusterInformation,
- FatalErrorHandler fatalErrorHandler,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
- super(
- flinkConfig,
- env,
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- slotManager,
- clusterPartitionTrackerFactory,
- jobLeaderIdService,
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup);
+ private
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
+
+ private final YarnResourceManagerClientFactory
yarnResourceManagerClientFactory;
+
+ private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+ public YarnResourceManagerDriver(
+ Configuration flinkConfig,
+ YarnResourceManagerDriverConfiguration configuration,
+ YarnResourceManagerClientFactory
yarnResourceManagerClientFactory,
+ YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+ super(flinkConfig,
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
this.yarnConfig = new YarnConfiguration();
- this.workerNodeMap = new ConcurrentHashMap<>();
+ this.requestResourceFutures = new HashMap<>();
+ this.configuration = configuration;
+
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
- YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) *
1000;
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
- YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
log.warn("The heartbeat interval of the Flink
Application master ({}) is greater " +
"than YARN's expiry interval ({}). The
application is likely to be killed by YARN.",
- yarnHeartbeatIntervalMS,
yarnExpiryIntervalMS);
+ yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis =
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
- this.webInterfaceUrl = webInterfaceUrl;
-
- this.workerSpecContainerResourceAdapter =
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+ this.taskExecutorProcessSpecContainerResourceAdapter =
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig,
yarnConfig);
this.registerApplicationMasterResponseReflector = new
RegisterApplicationMasterResponseReflector(log);
this.matchingStrategy =
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- }
-
- protected AMRMClientAsync<AMRMClient.ContainerRequest>
createAndStartResourceManagerClient(
- YarnConfiguration yarnConfiguration,
- int yarnHeartbeatIntervalMillis,
- @Nullable String webInterfaceUrl) throws Exception {
- AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
- yarnHeartbeatIntervalMillis,
- this);
-
- resourceManagerClient.init(yarnConfiguration);
- resourceManagerClient.start();
-
- //TODO: change akka address to tcp host and port, the
getAddress() interface should return a standard tcp address
- Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
- final int restPort;
-
- if (webInterfaceUrl != null) {
- final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
- if (lastColon == -1) {
- restPort = -1;
- } else {
- restPort =
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
- }
- } else {
- restPort = -1;
- }
-
- final RegisterApplicationMasterResponse
registerApplicationMasterResponse =
-
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort,
webInterfaceUrl);
-
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
- updateMatchingStrategy(registerApplicationMasterResponse);
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- return resourceManagerClient;
+ this.yarnResourceManagerClientFactory =
yarnResourceManagerClientFactory;
+ this.yarnNodeManagerClientFactory =
yarnNodeManagerClientFactory;
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final List<Container> containersFromPreviousAttempts =
-
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
- log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
- for (final Container container :
containersFromPreviousAttempts) {
- final ResourceID resourceID =
getContainerResourceId(container);
- workerNodeMap.put(resourceID, new
YarnWorkerNode(container, resourceID));
- }
- }
-
- private void updateMatchingStrategy(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final Optional<Set<String>> schedulerResourceTypesOptional =
-
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
- final WorkerSpecContainerResourceAdapter.MatchingStrategy
strategy;
- if (schedulerResourceTypesOptional.isPresent()) {
- Set<String> types =
schedulerResourceTypesOptional.get();
- log.info("Register application master response contains
scheduler resource types: {}.", types);
- matchingStrategy = types.contains("CPU") ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- } else {
- log.info("Register application master response does not
contain scheduler resource types, use '{}'.",
-
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
- }
- log.info("Container matching strategy: {}.", matchingStrategy);
- }
-
- protected NMClientAsync
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
- // create the client to communicate with the node managers
- NMClientAsync nodeManagerClient =
NMClientAsync.createNMClientAsync(this);
- nodeManagerClient.init(yarnConfiguration);
- nodeManagerClient.start();
- return nodeManagerClient;
- }
-
- @Override
- protected Configuration loadClientConfiguration() {
- return
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
- }
+ //
------------------------------------------------------------------------
+ // ResourceManagerDriver
+ //
------------------------------------------------------------------------
@Override
- protected void initialize() throws ResourceManagerException {
+ protected void initializeInternal() throws Exception {
+ final YarnContainerEventHandler yarnContainerEventHandler = new
YarnContainerEventHandler();
try {
- resourceManagerClient =
createAndStartResourceManagerClient(
- yarnConfig,
+ resourceManagerClient =
yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
- webInterfaceUrl);
+ yarnContainerEventHandler);
+ resourceManagerClient.init(yarnConfig);
+ resourceManagerClient.start();
+
+ final RegisterApplicationMasterResponse
registerApplicationMasterResponse = registerApplicationMaster();
+
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+
updateMatchingStrategy(registerApplicationMasterResponse);
} catch (Exception e) {
throw new ResourceManagerException("Could not start
resource manager client.", e);
}
- nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
+ nodeManagerClient =
yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
+ nodeManagerClient.init(yarnConfig);
+ nodeManagerClient.start();
}
@Override
- public void terminate() throws Exception {
+ public CompletableFuture<Void> terminate() {
// shut down all components
- Exception firstException = null;
+ Exception exception = null;
if (resourceManagerClient != null) {
try {
resourceManagerClient.stop();
} catch (Exception e) {
- firstException = e;
+ exception = e;
}
}
if (nodeManagerClient != null) {
try {
nodeManagerClient.stop();
} catch (Exception e) {
- firstException =
ExceptionUtils.firstOrSuppressed(e, firstException);
+ exception = ExceptionUtils.firstOrSuppressed(e,
exception);
}
}
- ExceptionUtils.tryRethrowException(firstException);
+ return exception == null ?
+ FutureUtils.completedVoidFuture() :
+ FutureUtils.completedExceptionally(exception);
}
@Override
- protected void internalDeregisterApplication(
- ApplicationStatus finalStatus,
- @Nullable String diagnostics) {
-
+ public void deregisterApplication(ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) {
// first, de-register from YARN
- FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+ final FinalApplicationStatus yarnStatus =
getYarnStatus(finalStatus);
log.info("Unregister application from the YARN Resource Manager
with final status {}.", yarnStatus);
final Optional<URL> historyServerURL =
HistoryServerUtils.getHistoryServerURL(flinkConfig);
final String appTrackingUrl =
historyServerURL.map(URL::toString).orElse("");
try {
-
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics,
appTrackingUrl);
+
resourceManagerClient.unregisterApplicationMaster(yarnStatus,
optionalDiagnostics, appTrackingUrl);
} catch (Throwable t) {
log.error("Could not unregister the application
master.", t);
}
- Utils.deleteApplicationFiles(env);
+ Utils.deleteApplicationFiles(configuration.getYarnFiles());
}
@Override
- public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
- return requestYarnContainer(workerResourceSpec);
- }
+ public CompletableFuture<YarnWorkerNode>
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+ final Optional<Resource> containerResourceOptional =
getContainerResource(taskExecutorProcessSpec);
+ final CompletableFuture<YarnWorkerNode> requestResourceFuture =
new CompletableFuture<>();
- @VisibleForTesting
- Optional<Resource> getContainerResource(WorkerResourceSpec
workerResourceSpec) {
- return
workerSpecContainerResourceAdapter.tryComputeContainerResource(workerResourceSpec);
+ if (containerResourceOptional.isPresent()) {
+
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+ // make sure we transmit the request fast and receive
fast news of granted allocations
+
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new
LinkedList<>()).add(requestResourceFuture);
+
+ log.info("Requesting new TaskExecutor container with
resource {}.", taskExecutorProcessSpec);
+ } else {
+ requestResourceFuture.completeExceptionally(
+ new
ResourceManagerException(String.format("Could not compute the container
Resource from the given TaskExecutorProcessSpec %s.",
taskExecutorProcessSpec)));
+ }
+
+ return requestResourceFuture;
}
@Override
- public boolean stopWorker(final YarnWorkerNode workerNode) {
+ public void releaseResource(YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
log.info("Stopping container {}.",
workerNode.getResourceID().getStringWithMetadata());
nodeManagerClient.stopContainerAsync(container.getId(),
container.getNodeId());
resourceManagerClient.releaseAssignedContainer(container.getId());
- workerNodeMap.remove(workerNode.getResourceID());
- return true;
- }
-
- @Override
- protected YarnWorkerNode workerStarted(ResourceID resourceID) {
- return workerNodeMap.get(resourceID);
}
//
------------------------------------------------------------------------
- // AMRMClientAsync CallbackHandler methods
+ // Internal
//
------------------------------------------------------------------------
- @Override
- public float getProgress() {
- // Temporarily need not record the total size of asked and
allocated containers
- return 1;
- }
-
- @Override
- public void onContainersCompleted(final List<ContainerStatus> statuses)
{
- runAsync(() -> {
- log.debug("YARN ResourceManager reported the
following containers completed: {}.", statuses);
- for (final ContainerStatus containerStatus :
statuses) {
-
- final ResourceID resourceId = new
ResourceID(containerStatus.getContainerId().toString());
- final YarnWorkerNode yarnWorkerNode =
workerNodeMap.remove(resourceId);
-
-
notifyAllocatedWorkerStopped(resourceId);
-
- if (yarnWorkerNode != null) {
- // Container completed
unexpectedly ~> start a new one
-
requestYarnContainerIfRequired();
- }
- // Eagerly close the connection with
task manager.
- closeTaskManagerConnection(resourceId,
new Exception(containerStatus.getDiagnostics()));
- }
- }
- );
- }
-
- @Override
- public void onContainersAllocated(List<Container> containers) {
- runAsync(() -> {
- log.info("Received {} containers.", containers.size());
-
- for (Map.Entry<Resource, List<Container>> entry :
groupContainerByResource(containers).entrySet()) {
- onContainersOfResourceAllocated(entry.getKey(),
entry.getValue());
- }
-
- // if we are waiting for no further containers, we can
go to the
- // regular heartbeat interval
- if (getNumRequestedNotAllocatedWorkers() <= 0) {
-
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
- }
- });
- }
-
- private Map<Resource, List<Container>>
groupContainerByResource(List<Container> containers) {
- return
containers.stream().collect(Collectors.groupingBy(Container::getResource));
- }
-
private void onContainersOfResourceAllocated(Resource resource,
List<Container> containers) {
- final List<WorkerResourceSpec> pendingWorkerResourceSpecs =
-
workerSpecContainerResourceAdapter.getWorkerSpecs(resource,
matchingStrategy).stream()
+ final List<TaskExecutorProcessSpec>
pendingTaskExecutorProcessSpecs =
+
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
matchingStrategy).stream()
.flatMap(spec ->
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
.collect(Collectors.toList());
- int numPending = pendingWorkerResourceSpecs.size();
+ int numPending = pendingTaskExecutorProcessSpecs.size();
log.info("Received {} containers with resource {}, {} pending
container requests.",
containers.size(),
resource,
numPending);
final Iterator<Container> containerIterator =
containers.iterator();
- final Iterator<WorkerResourceSpec> pendingWorkerSpecIterator =
pendingWorkerResourceSpecs.iterator();
+ final Iterator<TaskExecutorProcessSpec>
pendingTaskExecutorProcessSpecIterator =
pendingTaskExecutorProcessSpecs.iterator();
final Iterator<AMRMClient.ContainerRequest>
pendingRequestsIterator =
- getPendingRequestsAndCheckConsistency(resource,
pendingWorkerResourceSpecs.size()).iterator();
+ getPendingRequestsAndCheckConsistency(resource,
pendingTaskExecutorProcessSpecs.size()).iterator();
int numAccepted = 0;
- while (containerIterator.hasNext() &&
pendingWorkerSpecIterator.hasNext()) {
- final WorkerResourceSpec workerResourceSpec =
pendingWorkerSpecIterator.next();
+ while (containerIterator.hasNext() &&
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+ final TaskExecutorProcessSpec taskExecutorProcessSpec =
pendingTaskExecutorProcessSpecIterator.next();
final Container container = containerIterator.next();
final AMRMClient.ContainerRequest pendingRequest =
pendingRequestsIterator.next();
final ResourceID resourceId =
getContainerResourceId(container);
+ final CompletableFuture<YarnWorkerNode>
requestResourceFuture =
+ Preconditions.checkNotNull(
+ Preconditions.checkNotNull(
+
requestResourceFutures.get(taskExecutorProcessSpec),
+ "The requestResourceFuture for
TasExecutorProcessSpec %s should not be null.", taskExecutorProcessSpec).poll(),
+ "The requestResourceFuture queue for
TasExecutorProcessSpec %s should not be empty.", taskExecutorProcessSpec);
+ if
(requestResourceFutures.get(taskExecutorProcessSpec).isEmpty()) {
Review comment:
Isn't the `if` condition never `true` since it is already covered by the
previous `checkNotNull`? I.e. a `NullPointerException` will be thrown before
this `if` can be reached.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
- * The yarn implementation of the resource manager. Used when the system is
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
*/
-public class YarnResourceManager extends
LegacyActiveResourceManager<YarnWorkerNode>
- implements AMRMClientAsync.CallbackHandler,
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends
AbstractResourceManagerDriver<YarnWorkerNode> {
private static final Priority RM_REQUEST_PRIORITY =
Priority.newInstance(1);
- /** YARN container map. */
- private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
/** Environment variable name of the hostname given by the YARN.
* In task executor we use the hostnames given by YARN consistently
throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received
shutdown request from YARN ResourceManager.";
- /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
- private final int yarnHeartbeatIntervalMillis;
-
private final YarnConfiguration yarnConfig;
- @Nullable
- private final String webInterfaceUrl;
+ /** The process environment variables. */
+ private final YarnResourceManagerDriverConfiguration configuration;
- /** The heartbeat interval while the resource master is waiting for
containers. */
- private final int containerRequestHeartbeatIntervalMillis;
+ /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
+ private final int yarnHeartbeatIntervalMillis;
/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient;
+ /** The heartbeat interval while the resource master is waiting for
containers. */
+ private final int containerRequestHeartbeatIntervalMillis;
+
/** Client to communicate with the Node manager and launch TaskExecutor
processes. */
private NMClientAsync nodeManagerClient;
- private final WorkerSpecContainerResourceAdapter
workerSpecContainerResourceAdapter;
+ /** Request resource futures, keyed by container ids. */
+ private final Map<TaskExecutorProcessSpec,
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+ private final TaskExecutorProcessSpecContainerResourceAdapter
taskExecutorProcessSpecContainerResourceAdapter;
private final RegisterApplicationMasterResponseReflector
registerApplicationMasterResponseReflector;
- private WorkerSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
-
- public YarnResourceManager(
- RpcService rpcService,
- ResourceID resourceId,
- Configuration flinkConfig,
- Map<String, String> env,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- SlotManager slotManager,
- ResourceManagerPartitionTrackerFactory
clusterPartitionTrackerFactory,
- JobLeaderIdService jobLeaderIdService,
- ClusterInformation clusterInformation,
- FatalErrorHandler fatalErrorHandler,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
- super(
- flinkConfig,
- env,
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- slotManager,
- clusterPartitionTrackerFactory,
- jobLeaderIdService,
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup);
+ private
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
+
+ private final YarnResourceManagerClientFactory
yarnResourceManagerClientFactory;
+
+ private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+ public YarnResourceManagerDriver(
+ Configuration flinkConfig,
+ YarnResourceManagerDriverConfiguration configuration,
+ YarnResourceManagerClientFactory
yarnResourceManagerClientFactory,
+ YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+ super(flinkConfig,
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
this.yarnConfig = new YarnConfiguration();
- this.workerNodeMap = new ConcurrentHashMap<>();
+ this.requestResourceFutures = new HashMap<>();
+ this.configuration = configuration;
+
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
- YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) *
1000;
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
- YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
log.warn("The heartbeat interval of the Flink
Application master ({}) is greater " +
"than YARN's expiry interval ({}). The
application is likely to be killed by YARN.",
- yarnHeartbeatIntervalMS,
yarnExpiryIntervalMS);
+ yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis =
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
- this.webInterfaceUrl = webInterfaceUrl;
-
- this.workerSpecContainerResourceAdapter =
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+ this.taskExecutorProcessSpecContainerResourceAdapter =
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig,
yarnConfig);
this.registerApplicationMasterResponseReflector = new
RegisterApplicationMasterResponseReflector(log);
this.matchingStrategy =
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- }
-
- protected AMRMClientAsync<AMRMClient.ContainerRequest>
createAndStartResourceManagerClient(
- YarnConfiguration yarnConfiguration,
- int yarnHeartbeatIntervalMillis,
- @Nullable String webInterfaceUrl) throws Exception {
- AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
- yarnHeartbeatIntervalMillis,
- this);
-
- resourceManagerClient.init(yarnConfiguration);
- resourceManagerClient.start();
-
- //TODO: change akka address to tcp host and port, the
getAddress() interface should return a standard tcp address
- Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
- final int restPort;
-
- if (webInterfaceUrl != null) {
- final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
- if (lastColon == -1) {
- restPort = -1;
- } else {
- restPort =
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
- }
- } else {
- restPort = -1;
- }
-
- final RegisterApplicationMasterResponse
registerApplicationMasterResponse =
-
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort,
webInterfaceUrl);
-
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
- updateMatchingStrategy(registerApplicationMasterResponse);
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- return resourceManagerClient;
+ this.yarnResourceManagerClientFactory =
yarnResourceManagerClientFactory;
+ this.yarnNodeManagerClientFactory =
yarnNodeManagerClientFactory;
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final List<Container> containersFromPreviousAttempts =
-
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
- log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
- for (final Container container :
containersFromPreviousAttempts) {
- final ResourceID resourceID =
getContainerResourceId(container);
- workerNodeMap.put(resourceID, new
YarnWorkerNode(container, resourceID));
- }
- }
-
- private void updateMatchingStrategy(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final Optional<Set<String>> schedulerResourceTypesOptional =
-
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
- final WorkerSpecContainerResourceAdapter.MatchingStrategy
strategy;
- if (schedulerResourceTypesOptional.isPresent()) {
- Set<String> types =
schedulerResourceTypesOptional.get();
- log.info("Register application master response contains
scheduler resource types: {}.", types);
- matchingStrategy = types.contains("CPU") ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- } else {
- log.info("Register application master response does not
contain scheduler resource types, use '{}'.",
-
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
- }
- log.info("Container matching strategy: {}.", matchingStrategy);
- }
-
- protected NMClientAsync
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
- // create the client to communicate with the node managers
- NMClientAsync nodeManagerClient =
NMClientAsync.createNMClientAsync(this);
- nodeManagerClient.init(yarnConfiguration);
- nodeManagerClient.start();
- return nodeManagerClient;
- }
-
- @Override
- protected Configuration loadClientConfiguration() {
- return
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
- }
+ //
------------------------------------------------------------------------
+ // ResourceManagerDriver
+ //
------------------------------------------------------------------------
@Override
- protected void initialize() throws ResourceManagerException {
+ protected void initializeInternal() throws Exception {
+ final YarnContainerEventHandler yarnContainerEventHandler = new
YarnContainerEventHandler();
try {
- resourceManagerClient =
createAndStartResourceManagerClient(
- yarnConfig,
+ resourceManagerClient =
yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
- webInterfaceUrl);
+ yarnContainerEventHandler);
+ resourceManagerClient.init(yarnConfig);
+ resourceManagerClient.start();
+
+ final RegisterApplicationMasterResponse
registerApplicationMasterResponse = registerApplicationMaster();
+
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+
updateMatchingStrategy(registerApplicationMasterResponse);
} catch (Exception e) {
Review comment:
Out of curiosity: What is the reason for catching `Exceptions` here but
not for the `NodeManagerClient`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]