[
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430849#comment-15430849
]
ASF GitHub Bot commented on FLINK-4348:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2389#discussion_r75686098
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
---
@@ -94,26 +130,218 @@ public RegistrationResponse apply(final
JobMasterGateway jobMasterGateway) {
* @return Slot assignment
*/
@RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
+ public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
+ // TODO slotManager.requestSlot(slotRequest);
+ return new
AcknowledgeSlotRequest(slotRequest.getAllocationID());
}
/**
- *
- * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that
registers
- * @param resourceID The resource ID of the TaskExecutor
that registers
+ * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that
registers
+ * @param resourceID The resource ID of the TaskExecutor
that registers
*
* @return The response by the ResourceManager.
*/
@RpcMethod
- public org.apache.flink.runtime.rpc.registration.RegistrationResponse
registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID) {
+ public
Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse>
registerTaskExecutor(
+ final UUID resourceManagerLeaderId,
+ final String taskExecutorAddress,
+ final ResourceID resourceID
+ ) {
+ log.info("received register from taskExecutor {}, address {}",
resourceID, taskExecutorAddress);
+ Future<TaskExecutorGateway> taskExecutorFuture =
+ getRpcService().connect(taskExecutorAddress,
TaskExecutorGateway.class);
+
+ return taskExecutorFuture.map(new Mapper<TaskExecutorGateway,
org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
+ @Override
+ public
org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final
TaskExecutorGateway taskExecutorGateway) {
+ // decline registration if resourceManager
cannot connect to the taskExecutor using the given address
+ if(taskExecutorGateway == null) {
+ log.warn("resourceManager {} decline
registration from the taskExecutor {}, cannot connect to it using given address
{} ",
+ getAddress(), resourceID,
taskExecutorAddress);
+ return new
org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot
connect to taskExecutor using given address");
+ } else {
+ // save the register taskExecutor
gateway
+ taskExecutorGateways.put(resourceID,
taskExecutorGateway);
+ // schedule the heartbeat with the
registered taskExecutor
+
ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new
ResourceManagerToTaskExecutorHeartbeatScheduler(
+ ResourceManager.this,
leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
+ heartbeatScheduler.start();
+ heartbeatSchedulers.put(resourceID,
heartbeatScheduler);
+ return new
TaskExecutorRegistrationSuccess(new InstanceID(),
heartbeatScheduler.getHeartbeatInterval());
+ }
+ }
+ }, getMainThreadExecutionContext());
+
+ }
+
+ /**
+ * notify resource failure to resourceManager, because of two reasons:
+ * 1. cannot keep heartbeat with taskManager for several times, mark
the resource as failed
+ * 2. in some corner cases, TM will be marked as invalid by cluster
manager master(e.g. yarn master), but TM itself does not realize.
+ *
+ * @param resourceID identify the taskManager which to stop
+ */
+ @RpcMethod
--- End diff --
I'm not sure whether this has to be a `RpcMethod`.
> Implement communication from ResourceManager to TaskManager
> -----------------------------------------------------------
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Kurt Young
> Assignee: zhangjing
>
> There are mainly 3 logics initiated from RM to TM:
> * Heartbeat, RM use heartbeat to sync with TM's slot status
> * SlotRequest, when RM decides to assign slot to JM, should first try to send
> request to TM for slot. TM can either accept or reject this request.
> * FailureNotify, in some corner cases, TM will be marked as invalid by
> cluster manager master(e.g. yarn master), but TM itself does not realize. RM
> should send failure notify to TM and TM can terminate itself
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)