[
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430672#comment-15430672
]
ASF GitHub Bot commented on FLINK-4348:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2389#discussion_r75669361
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
---
@@ -94,26 +130,218 @@ public RegistrationResponse apply(final
JobMasterGateway jobMasterGateway) {
* @return Slot assignment
*/
@RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
+ public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
+ // TODO slotManager.requestSlot(slotRequest);
+ return new
AcknowledgeSlotRequest(slotRequest.getAllocationID());
}
/**
- *
- * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that
registers
- * @param resourceID The resource ID of the TaskExecutor
that registers
+ * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that
registers
+ * @param resourceID The resource ID of the TaskExecutor
that registers
*
* @return The response by the ResourceManager.
*/
@RpcMethod
- public org.apache.flink.runtime.rpc.registration.RegistrationResponse
registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID) {
+ public
Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse>
registerTaskExecutor(
+ final UUID resourceManagerLeaderId,
+ final String taskExecutorAddress,
+ final ResourceID resourceID
+ ) {
+ log.info("received register from taskExecutor {}, address {}",
resourceID, taskExecutorAddress);
+ Future<TaskExecutorGateway> taskExecutorFuture =
+ getRpcService().connect(taskExecutorAddress,
TaskExecutorGateway.class);
+
+ return taskExecutorFuture.map(new Mapper<TaskExecutorGateway,
org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
+ @Override
+ public
org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final
TaskExecutorGateway taskExecutorGateway) {
+ // decline registration if resourceManager
cannot connect to the taskExecutor using the given address
+ if(taskExecutorGateway == null) {
+ log.warn("resourceManager {} decline
registration from the taskExecutor {}, cannot connect to it using given address
{} ",
+ getAddress(), resourceID,
taskExecutorAddress);
+ return new
org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot
connect to taskExecutor using given address");
+ } else {
+ // save the register taskExecutor
gateway
+ taskExecutorGateways.put(resourceID,
taskExecutorGateway);
+ // schedule the heartbeat with the
registered taskExecutor
+
ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new
ResourceManagerToTaskExecutorHeartbeatScheduler(
+ ResourceManager.this,
leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
+ heartbeatScheduler.start();
+ heartbeatSchedulers.put(resourceID,
heartbeatScheduler);
+ return new
TaskExecutorRegistrationSuccess(new InstanceID(),
heartbeatScheduler.getHeartbeatInterval());
+ }
+ }
+ }, getMainThreadExecutionContext());
+
+ }
+
+ /**
+ * notify resource failure to resourceManager, because of two reasons:
+ * 1. cannot keep heartbeat with taskManager for several times, mark
the resource as failed
+ * 2. in some corner cases, TM will be marked as invalid by cluster
manager master(e.g. yarn master), but TM itself does not realize.
+ *
+ * @param resourceID identify the taskManager which to stop
+ */
+ @RpcMethod
+ public void notifyResourceFailure(ResourceID resourceID) {
+ log.warn("receive failure notification of resource {}",
resourceID);
+ TaskExecutorGateway taskManager =
taskExecutorGateways.get(resourceID);
+ if(taskManager == null) {
+ // ignore command to stop an unregister taskManager
+ log.warn("ignore stop taskManager command because {} is
unregistered", resourceID);
+ } else {
+ taskExecutorGateways.remove(resourceID);
+ closeHeartbeatToResourceIfExist(resourceID);
+ // TODO notify slotManager and notify jobMaster,
+ // TODO
slotManager.notifyTaskManagerFailure(resourceID);
+ taskManager.shutDown(leaderSessionID);
+ }
+ }
+
+ /**
+ * close heartbeat triggers to resource if exist
+ * @param resourceID which resource need to stop keep heartbeat with
+ */
+ private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
+ if(heartbeatSchedulers.containsKey(resourceID)) {
+ ResourceManagerToTaskExecutorHeartbeatScheduler
heartbeatManager = heartbeatSchedulers.get(resourceID);
+ heartbeatManager.close();
+ heartbeatSchedulers.remove(resourceID);
+ }
+ }
+
+ /**
+ * send slotRequest to the taskManager which the given slot is on
+ *
+ * @param slotRequest slot request information
+ * @param slotID which slot is choosen
+ */
+ void requestSlotToTaskManager(final SlotRequest slotRequest, final
SlotID slotID) {
+ ResourceID resourceID = slotID.getResourceID();
+ TaskExecutorGateway taskManager =
taskExecutorGateways.get(resourceID);
+ if (taskManager == null) {
+ // the given slot is on an unregister taskManager
+ log.warn("ignore slot {} because it is on an unregister
taskManager", slotID);
+ // TODO
slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
+ } else {
+ Future<SlotAllocationResponse> response =
taskManager.requestSlotForJob(
+ slotRequest.getAllocationID(),
slotRequest.getJobID(), slotID, leaderSessionID);
+ response.onSuccess(new
OnSuccess<SlotAllocationResponse>() {
+ @Override
+ public void onSuccess(SlotAllocationResponse
result) throws Throwable {
+ if (result instanceof
SlotAllocationResponse.Decline) {
+ // TODO
slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
+ }
+ }
+ }, getMainThreadExecutionContext());
+ response.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ log.error("fail to request slot on
taskManager because of error", failure);
+ // TODO
slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
+ }
+ }, getMainThreadExecutionContext());
+ }
+ }
+
+ /**
+ * notify slotReport which is sent by taskManager to resourceManager
+ *
+ * @param slotReport the slot allocation report from taskManager
+ */
+ void handleSlotReportFromTaskManager(final SlotReport slotReport) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ // TODO slotManager.updateSlot(slotReport);
+ }
+ });
+
+ }
+
+
+ /**
+ * callback method when current resourceManager is granted leadership
+ * @param newLeaderSessionID unique leadershipID
+ */
+ void handleGrantLeadership(final UUID newLeaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+ leaderSessionID = newLeaderSessionID;
+ // confirming the leader session ID might be
blocking, thus do it concurrently
+ getRpcService().scheduleRunnable(
--- End diff --
`getRpcService.runAsync(...)` is equivalent to the `scheduleRunnable` call
with `0` delay.
> Implement communication from ResourceManager to TaskManager
> -----------------------------------------------------------
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Kurt Young
> Assignee: zhangjing
>
> There are mainly 3 logics initiated from RM to TM:
> * Heartbeat, RM use heartbeat to sync with TM's slot status
> * SlotRequest, when RM decides to assign slot to JM, should first try to send
> request to TM for slot. TM can either accept or reject this request.
> * FailureNotify, in some corner cases, TM will be marked as invalid by
> cluster manager master(e.g. yarn master), but TM itself does not realize. RM
> should send failure notify to TM and TM can terminate itself
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)