[
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434842#comment-15434842
]
ASF GitHub Bot commented on FLINK-4449:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2410#discussion_r76047656
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
---
@@ -94,26 +126,142 @@ public RegistrationResponse apply(final
JobMasterGateway jobMasterGateway) {
* @return Slot assignment
*/
@RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
+ public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
+ return new
AcknowledgeSlotRequest(slotRequest.getAllocationID());
}
/**
+ * Register a {@link
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor} at the resource manager.
*
- * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that
registers
- * @param resourceID The resource ID of the TaskExecutor
that registers
- *
+ * @param resourceManagerLeaderId The fencing token for the
ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that
registers
+ * @param resourceID The resource ID of the TaskExecutor
that registers
* @return The response by the ResourceManager.
*/
@RpcMethod
- public org.apache.flink.runtime.rpc.registration.RegistrationResponse
registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID) {
+ public
Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse>
registerTaskExecutor(
+ final UUID resourceManagerLeaderId,
+ final String taskExecutorAddress,
+ final ResourceID resourceID)
+ {
+ log.info("Received taskExecutor registration with resource id
{} from {}", resourceID, taskExecutorAddress);
+ Future<TaskExecutorGateway> taskExecutorFuture =
+ getRpcService().connect(taskExecutorAddress,
TaskExecutorGateway.class);
+
+ return taskExecutorFuture.map(new Mapper<TaskExecutorGateway,
org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
+ @Override
+ public
org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(
+ final TaskExecutorGateway taskExecutorGateway)
+ {
+ // decline registration if resourceManager
cannot connect to the taskExecutor using the given address
+ if (taskExecutorGateway == null) {
+ log.warn("ResourceManager {} decline
taskExecutor registration with resource id {} from {} because cannot connect to
it using given address",
+ getAddress(), resourceID,
taskExecutorAddress);
+ return new
org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot
connect to taskExecutor using given address");
+ } else {
+ // register target taskExecutor to
heartbeat manager
+ taskExecutorGateways.put(resourceID,
taskExecutorGateway);
+ long heartbeatInterval =
heartbeatManager.registerTarget(resourceID, taskExecutorGateway,
taskExecutorAddress);
+ return new
TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatInterval);
+ }
+ }
+ }, getMainThreadExecutionContext());
+ }
+
+ /**
+ * notify lost heartbeat with specified taskExecutor
+ *
+ * @param resourceID identify the taskManager which lost heartbeat with
+ */
+ void notifyLostHeartbeat(final ResourceID resourceID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ TaskExecutorGateway failedTaskManager =
taskExecutorGateways.remove(resourceID);
+ if (failedTaskManager != null) {
+
heartbeatManager.stopHeartbeatToTaskExecutor(resourceID);
+
failedTaskManager.markedFailed(leaderSessionID);
+ }
+ }
+ });
+ }
+
+
+ /**
+ * notify slotReport which is sent by taskManager to resourceManager
+ *
+ * @param slotReport the slot allocation report from taskManager
+ */
+ void handleSlotReportFromTaskManager(final SlotReport slotReport) {
+
+ }
+
+
+ /**
+ * callback method when current resourceManager is granted leadership
+ *
+ * @param newLeaderSessionID unique leadershipID
+ */
+ void handleGrantLeadership(final UUID newLeaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+ leaderSessionID = newLeaderSessionID;
+ heartbeatManager = new
ResourceManagerToTaskExecutorHeartbeatManager(ResourceManager.this,
newLeaderSessionID, log);
--- End diff --
why can't the heartbeat manager always be created and only be activated
with the new leader session ID. Then we would save some object creations.
> Heartbeat Manager between ResourceManager and TaskExecutor
> ----------------------------------------------------------
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhangjing
> Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)