[
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551263#comment-15551263
]
ASF GitHub Bot commented on FLINK-4348:
---------------------------------------
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/2571#discussion_r82134943
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -245,22 +255,26 @@ public TaskExecutorGateway call() throws Exception {
resourceID,
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
throw new Exception("Invalid leader
session id");
}
- return
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5,
TimeUnit.SECONDS);
+ return
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
+ .get(timeout.toMilliseconds(),
timeout.getUnit());
}
}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable,
RegistrationResponse>() {
@Override
public RegistrationResponse apply(TaskExecutorGateway
taskExecutorGateway, Throwable throwable) {
if (throwable != null) {
return new
RegistrationResponse.Decline(throwable.getMessage());
} else {
- WorkerType oldWorker =
taskExecutorGateways.remove(resourceID);
- if (oldWorker != null) {
+ WorkerRegistration oldRegistration =
taskExecutors.remove(resourceID);
+ if (oldRegistration != null) {
// TODO :: suggest old
taskExecutor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
--- End diff --
notifyTaskManagerFailure is called again in
slotManager.registerTaskExecutor, maybe we should only keep one of these
> Implement slot allocation protocol with TaskExecutor
> ----------------------------------------------------
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Kurt Young
> Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,
> slotManager marks the slot as occupied, then tells the taskExecutor to give
> the slot to the specified JobMaster.
> when a slot request is sent to taskExecutor, it should contain following
> parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID.
> There exists 3 following possibilities of the response from taskExecutor, we
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified
> jobMaster as expected.
> 2. Decline request if the slot is already occupied by other AllocationID.
> 3. Timeout which could caused by lost of request message or response message
> or slow network transfer.
> On the first occasion, ResourceManager need to do nothing. However, under the
> second and third occasion, ResourceManager need to notify slotManager,
> slotManager will verify and clear all the previous allocate information for
> this slot request firstly, then try to find a proper slot for the slot
> request again. This may cause some duplicate allocation, e.g. the slot
> request to TaskManager is successful but the response is lost somehow, so we
> may request a slot in another TaskManager, this causes two slots assigned to
> one request, but it can be taken care of by rejecting registration at
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I
> think it's better that SlotManager delegates the rpc call to ResourceManager
> when SlotManager need to communicate with outside world. ResourceManager
> know which taskExecutor to send the request based on ResourceID. Besides this
> RPC call which used to request slot to taskExecutor should not be a
> RpcMethod, because we hope only SlotManager has permission to call the
> method, but the other component, for example JobMaster and TaskExecutor,
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor
> should notify the free slot to ResourceManager immediately, or wait for next
> heartbeat sync. The advantage of first way is the resourceManager’s view
> could be updated faster. The advantage of second way is save a RPC method in
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as
> an ask operation where the response is returned as a future. Second,
> resourceManager send the slot request in fire and forget way, the response
> could be returned by an RPC call. I prefer the first one because it is more
> simple and could save a RPC method in ResourceManager (for callback in the
> second way).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)