[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877556#comment-16877556 ]
Yun Gao edited comment on FLINK-12852 at 7/3/19 7:22 AM: --------------------------------------------------------- Hi all, very thanks for the discussion and all the valuable suggestions. I'd like to have a short summarization of the possible solutions to this issue based on the above discussion: For the short term, we may also # Use blocking shuffle instead of the the pipeline shuffle for the affected partitioner in batch jobs. # Decrease the exclusive buffers per channel to 1, and postpone the request of exclusive buffers till data is received. They decreases the probability of this issue. For the long run, this issue may be also related with the fine-grained resource management, we may # Make memory buffers a resource per slot and disallow slots to eat into each others' network buffers. # Keep the required/max mechanism, but to fix the deadlock problem, the local buffer pool need be able to return the overused buffers when there are not enough required buffers for the other channels/local buffer pool, like spill to disk. If there are misunderstands and missing points, let me know and I would update this comment. :) was (Author: gaoyunhaii): Hi all, very thanks for the discussion and all the valuable suggestions. I'd like to have a short summarization of the possible solutions to this issue based on the above discussion: For the short term, we may also # Use blocking shuffle instead of the the pipeline shuffle for the affected partitioner in batch jobs. # Decrease the exclusive buffers per channel to 1, and postpone the request of exclusive buffers till data is received. They decreases the probability of this issue. For the long run, this issue may be also related with the fine-grained resource management, we may # Make memory buffers a resource per slot and disallow slots to eat into each others' network buffers. # Keep the required/max mechanism, but to fix the deadlock problem, the local buffer pool need be able to return the overused buffers when there are not enough required buffers for the other channels/local buffer pool, like spill to disk. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > ---------------------------------------------------------------------- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.7.2, 1.8.1, 1.9.0 > Reporter: Yun Gao > Assignee: Yun Gao > Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 > nid=0x38845 waiting on condition [0x00007f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x000000073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)