[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874667#comment-16874667
 ] 

Yun Gao edited comment on FLINK-12852 at 6/28/19 4:38 AM:
----------------------------------------------------------

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.


was (Author: gaoyunhaii):
For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, __ I think __ currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
>                 Key: FLINK-12852
>                 URL: https://issues.apache.org/jira/browse/FLINK-12852
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.9.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to