XComp commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488482278
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
- * The yarn implementation of the resource manager. Used when the system is
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
*/
-public class YarnResourceManager extends
LegacyActiveResourceManager<YarnWorkerNode>
- implements AMRMClientAsync.CallbackHandler,
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends
AbstractResourceManagerDriver<YarnWorkerNode> {
private static final Priority RM_REQUEST_PRIORITY =
Priority.newInstance(1);
- /** YARN container map. */
- private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
/** Environment variable name of the hostname given by the YARN.
* In task executor we use the hostnames given by YARN consistently
throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received
shutdown request from YARN ResourceManager.";
- /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
- private final int yarnHeartbeatIntervalMillis;
-
private final YarnConfiguration yarnConfig;
- @Nullable
- private final String webInterfaceUrl;
+ /** The process environment variables. */
+ private final YarnResourceManagerDriverConfiguration configuration;
- /** The heartbeat interval while the resource master is waiting for
containers. */
- private final int containerRequestHeartbeatIntervalMillis;
+ /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
+ private final int yarnHeartbeatIntervalMillis;
/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient;
+ /** The heartbeat interval while the resource master is waiting for
containers. */
+ private final int containerRequestHeartbeatIntervalMillis;
+
/** Client to communicate with the Node manager and launch TaskExecutor
processes. */
private NMClientAsync nodeManagerClient;
- private final WorkerSpecContainerResourceAdapter
workerSpecContainerResourceAdapter;
+ /** Request resource futures, keyed by container ids. */
+ private final Map<TaskExecutorProcessSpec,
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+ private final TaskExecutorProcessSpecContainerResourceAdapter
taskExecutorProcessSpecContainerResourceAdapter;
private final RegisterApplicationMasterResponseReflector
registerApplicationMasterResponseReflector;
- private WorkerSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
-
- public YarnResourceManager(
- RpcService rpcService,
- ResourceID resourceId,
- Configuration flinkConfig,
- Map<String, String> env,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- SlotManager slotManager,
- ResourceManagerPartitionTrackerFactory
clusterPartitionTrackerFactory,
- JobLeaderIdService jobLeaderIdService,
- ClusterInformation clusterInformation,
- FatalErrorHandler fatalErrorHandler,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
- super(
- flinkConfig,
- env,
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- slotManager,
- clusterPartitionTrackerFactory,
- jobLeaderIdService,
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup);
+ private
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
+
+ private final YarnResourceManagerClientFactory
yarnResourceManagerClientFactory;
+
+ private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+ public YarnResourceManagerDriver(
+ Configuration flinkConfig,
+ YarnResourceManagerDriverConfiguration configuration,
+ YarnResourceManagerClientFactory
yarnResourceManagerClientFactory,
+ YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+ super(flinkConfig,
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
this.yarnConfig = new YarnConfiguration();
- this.workerNodeMap = new ConcurrentHashMap<>();
+ this.requestResourceFutures = new HashMap<>();
+ this.configuration = configuration;
+
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
- YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) *
1000;
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
- YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
log.warn("The heartbeat interval of the Flink
Application master ({}) is greater " +
"than YARN's expiry interval ({}). The
application is likely to be killed by YARN.",
- yarnHeartbeatIntervalMS,
yarnExpiryIntervalMS);
+ yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis =
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
- this.webInterfaceUrl = webInterfaceUrl;
-
- this.workerSpecContainerResourceAdapter =
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+ this.taskExecutorProcessSpecContainerResourceAdapter =
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig,
yarnConfig);
this.registerApplicationMasterResponseReflector = new
RegisterApplicationMasterResponseReflector(log);
this.matchingStrategy =
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- }
-
- protected AMRMClientAsync<AMRMClient.ContainerRequest>
createAndStartResourceManagerClient(
- YarnConfiguration yarnConfiguration,
- int yarnHeartbeatIntervalMillis,
- @Nullable String webInterfaceUrl) throws Exception {
- AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
- yarnHeartbeatIntervalMillis,
- this);
-
- resourceManagerClient.init(yarnConfiguration);
- resourceManagerClient.start();
-
- //TODO: change akka address to tcp host and port, the
getAddress() interface should return a standard tcp address
- Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
- final int restPort;
-
- if (webInterfaceUrl != null) {
- final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
- if (lastColon == -1) {
- restPort = -1;
- } else {
- restPort =
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
- }
- } else {
- restPort = -1;
- }
-
- final RegisterApplicationMasterResponse
registerApplicationMasterResponse =
-
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort,
webInterfaceUrl);
-
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
- updateMatchingStrategy(registerApplicationMasterResponse);
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- return resourceManagerClient;
+ this.yarnResourceManagerClientFactory =
yarnResourceManagerClientFactory;
+ this.yarnNodeManagerClientFactory =
yarnNodeManagerClientFactory;
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final List<Container> containersFromPreviousAttempts =
-
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
- log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
- for (final Container container :
containersFromPreviousAttempts) {
- final ResourceID resourceID =
getContainerResourceId(container);
- workerNodeMap.put(resourceID, new
YarnWorkerNode(container, resourceID));
- }
- }
-
- private void updateMatchingStrategy(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
- final Optional<Set<String>> schedulerResourceTypesOptional =
-
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
- final WorkerSpecContainerResourceAdapter.MatchingStrategy
strategy;
- if (schedulerResourceTypesOptional.isPresent()) {
- Set<String> types =
schedulerResourceTypesOptional.get();
- log.info("Register application master response contains
scheduler resource types: {}.", types);
- matchingStrategy = types.contains("CPU") ?
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
- } else {
- log.info("Register application master response does not
contain scheduler resource types, use '{}'.",
-
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
- }
- log.info("Container matching strategy: {}.", matchingStrategy);
- }
-
- protected NMClientAsync
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
- // create the client to communicate with the node managers
- NMClientAsync nodeManagerClient =
NMClientAsync.createNMClientAsync(this);
- nodeManagerClient.init(yarnConfiguration);
- nodeManagerClient.start();
- return nodeManagerClient;
- }
-
- @Override
- protected Configuration loadClientConfiguration() {
- return
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
- }
+ //
------------------------------------------------------------------------
+ // ResourceManagerDriver
+ //
------------------------------------------------------------------------
@Override
- protected void initialize() throws ResourceManagerException {
+ protected void initializeInternal() throws Exception {
+ final YarnContainerEventHandler yarnContainerEventHandler = new
YarnContainerEventHandler();
try {
- resourceManagerClient =
createAndStartResourceManagerClient(
- yarnConfig,
+ resourceManagerClient =
yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
- webInterfaceUrl);
+ yarnContainerEventHandler);
+ resourceManagerClient.init(yarnConfig);
+ resourceManagerClient.start();
+
+ final RegisterApplicationMasterResponse
registerApplicationMasterResponse = registerApplicationMaster();
+
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+
updateMatchingStrategy(registerApplicationMasterResponse);
} catch (Exception e) {
Review comment:
I see. I just realized that the `ResourceManagerException` might have
been misused here since we're catching exceptions related to YARN's resource
manager. `ResourceManagerException` relates to Flink's resource manager though.
Am I getting that right? It's unfortunate that there is this name clash.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]