XComp commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488498446
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -508,52 +371,103 @@ private void
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
return matchingContainerRequests;
}
- @Override
- public void onShutdownRequest() {
- onFatalError(new
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
- }
+ private ContainerLaunchContext createTaskExecutorLaunchContext(
+ ResourceID containerId,
+ String host,
+ TaskExecutorProcessSpec taskExecutorProcessSpec) throws
Exception {
- @Override
- public void onNodesUpdated(List<NodeReport> list) {
- // We are not interested in node updates
- }
+ // init the ContainerLaunchContext
+ final String currDir = configuration.getCurrentDir();
- @Override
- public void onError(Throwable error) {
- onFatalError(error);
- }
+ final ContaineredTaskManagerParameters taskManagerParameters =
+ ContaineredTaskManagerParameters.create(flinkConfig,
taskExecutorProcessSpec);
- //
------------------------------------------------------------------------
- // NMClientAsync CallbackHandler methods
- //
------------------------------------------------------------------------
- @Override
- public void onContainerStarted(ContainerId containerId, Map<String,
ByteBuffer> map) {
- log.debug("Succeeded to call YARN Node Manager to start
container {}.", containerId);
- }
+ log.info("TaskExecutor {} will be started on {} with {}.",
+ containerId.getStringWithMetadata(),
+ host,
+ taskExecutorProcessSpec);
- @Override
- public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
- // We are not interested in getting container status
+ final Configuration taskManagerConfig =
BootstrapTools.cloneConfiguration(flinkConfig);
+
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID,
containerId.getResourceIdString());
+
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
containerId.getMetadata());
+
+ final String taskManagerDynamicProperties =
+
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig,
taskManagerConfig);
+
+ log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+ final ContainerLaunchContext taskExecutorLaunchContext =
Utils.createTaskExecutorContext(
+ flinkConfig,
+ yarnConfig,
+ configuration,
+ taskManagerParameters,
+ taskManagerDynamicProperties,
+ currDir,
+ YarnTaskExecutorRunner.class,
+ log);
+
+ taskExecutorLaunchContext.getEnvironment()
+ .put(ENV_FLINK_NODE_ID, host);
+ return taskExecutorLaunchContext;
}
- @Override
- public void onContainerStopped(ContainerId containerId) {
- log.debug("Succeeded to call YARN Node Manager to stop
container {}.", containerId);
+ @VisibleForTesting
+ Optional<Resource> getContainerResource(TaskExecutorProcessSpec
taskExecutorProcessSpec) {
+ return
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
}
- @Override
- public void onStartContainerError(ContainerId containerId, Throwable t)
{
- runAsync(() ->
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+ private RegisterApplicationMasterResponse registerApplicationMaster()
throws Exception {
+ final int restPort;
+ final String webInterfaceUrl =
configuration.getWebInterfaceUrl();
+ final String rpcAddress = configuration.getRpcAddress();
+
+ if (webInterfaceUrl != null) {
+ final int lastColon = webInterfaceUrl.lastIndexOf(':');
Review comment:
I looked through the code as well. The URL is derived from
[webMonitorEndpoint](https://github.com/apache/flink/blob/c47ee2814aa0ef62237797626daaea95d6d52ca7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java#L175)
as you pointed out. The baseUrl is [instantiated as a
URL](https://github.com/apache/flink/blob/260ef2cf78970674ca5cad4a9f456b9a93110c69/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java#L238)
including the URI scheme information. The address and port are derived from
the netty channel setup. So, you might be right that the port should be
available in all cases.
I'm just concerned about the error handling in general. IMHO, we shouldn't
assume certain things to apply when doing the error handling here. Hence, we
should consider that there is a colon in the URL but no port is given (e.g.
`http://localhost`). In this case, we want to have the `restPort` being set to
`-1` as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]