KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488366730
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -508,52 +371,103 @@ private void
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
return matchingContainerRequests;
}
- @Override
- public void onShutdownRequest() {
- onFatalError(new
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
- }
+ private ContainerLaunchContext createTaskExecutorLaunchContext(
+ ResourceID containerId,
+ String host,
+ TaskExecutorProcessSpec taskExecutorProcessSpec) throws
Exception {
- @Override
- public void onNodesUpdated(List<NodeReport> list) {
- // We are not interested in node updates
- }
+ // init the ContainerLaunchContext
+ final String currDir = configuration.getCurrentDir();
- @Override
- public void onError(Throwable error) {
- onFatalError(error);
- }
+ final ContaineredTaskManagerParameters taskManagerParameters =
+ ContaineredTaskManagerParameters.create(flinkConfig,
taskExecutorProcessSpec);
- //
------------------------------------------------------------------------
- // NMClientAsync CallbackHandler methods
- //
------------------------------------------------------------------------
- @Override
- public void onContainerStarted(ContainerId containerId, Map<String,
ByteBuffer> map) {
- log.debug("Succeeded to call YARN Node Manager to start
container {}.", containerId);
- }
+ log.info("TaskExecutor {} will be started on {} with {}.",
+ containerId.getStringWithMetadata(),
+ host,
+ taskExecutorProcessSpec);
- @Override
- public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
- // We are not interested in getting container status
+ final Configuration taskManagerConfig =
BootstrapTools.cloneConfiguration(flinkConfig);
+
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID,
containerId.getResourceIdString());
+
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
containerId.getMetadata());
+
+ final String taskManagerDynamicProperties =
+
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig,
taskManagerConfig);
+
+ log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+ final ContainerLaunchContext taskExecutorLaunchContext =
Utils.createTaskExecutorContext(
+ flinkConfig,
+ yarnConfig,
+ configuration,
+ taskManagerParameters,
+ taskManagerDynamicProperties,
+ currDir,
+ YarnTaskExecutorRunner.class,
+ log);
+
+ taskExecutorLaunchContext.getEnvironment()
+ .put(ENV_FLINK_NODE_ID, host);
+ return taskExecutorLaunchContext;
}
- @Override
- public void onContainerStopped(ContainerId containerId) {
- log.debug("Succeeded to call YARN Node Manager to stop
container {}.", containerId);
+ @VisibleForTesting
+ Optional<Resource> getContainerResource(TaskExecutorProcessSpec
taskExecutorProcessSpec) {
+ return
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
}
- @Override
- public void onStartContainerError(ContainerId containerId, Throwable t)
{
- runAsync(() ->
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+ private RegisterApplicationMasterResponse registerApplicationMaster()
throws Exception {
+ final int restPort;
+ final String webInterfaceUrl =
configuration.getWebInterfaceUrl();
+ final String rpcAddress = configuration.getRpcAddress();
+
+ if (webInterfaceUrl != null) {
+ final int lastColon = webInterfaceUrl.lastIndexOf(':');
Review comment:
I'm not quite familiar with that logic. I could find the
`webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should
always have a port.
@tillrohrmann Could you help the ensure this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]