[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878692#comment-16878692
 ] 

zhijiang commented on FLINK-12852:
----------------------------------

Thanks for concerning this issue [~aitozi]

We already have the check `numTotalRequiredBuffers + numberOfSegmentsToRequest 
> totalNumberOfMemorySegments` during `NetworkBufferPool#requestMemorySegments` 
and this check is mainly used for judging whether the global size could satisfy 
the total core size. If we replace the `numberOfSegmentsToRequest` with 
`numRequiredBuffers` as you suggested above, what is the value for 
`numRequiredBuffers` here?

During `NetworkBufferPool#createBufferPool`, the `numRequiredBuffers` in check 
is for the core size in local pool. I guess your suggestion is also throwing 
the IOException("Insufficient buffers...") after allocating the exclusive 
buffers failed. There are two concerns for this solution:
 * How long do you want to wait before throwing the exception. The extra 
buffers recycle from other local pool might take some time after 
redistribution, but we could not estimate the rough time to throw this 
exception properly.
 * This seems a bit tricky/hacky to reuse the check here. If the exception 
tells users the message of insufficient buffers, users might be confused 
because the total buffers are actually enough. If the exception tells users the 
message of deadlock, users might still do not know what to do. The only benefit 
is identifying the deadlock easily, not via jstack.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
>                 Key: FLINK-12852
>                 URL: https://issues.apache.org/jira/browse/FLINK-12852
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.7.2, 1.8.1, 1.9.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to