[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878289#comment-16878289
 ] 

zhijiang edited comment on FLINK-12852 at 7/4/19 3:27 AM:
----------------------------------------------------------

Network resource matching in slot has many unsetting issues which would be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means RemoteInputChannel could only send partition 
request if the correspond task has no result partition or the partition's view 
has already been established. In future the InputSelection might also destroy 
the above assumption. Although the partition was requested, but the OP could 
select not to consumer that partition long time.


was (Author: zjwang):
Network resource matching in slot has many unsetting issues which should be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means `RemoteInputChannel` could only send partition 
request if this task has no result partition or the partition's view has 
already been established. In future the InputSelection might also destroy the 
above assumption. Although the partition was requested, but the OP could select 
not to consumer that partition long time.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
>                 Key: FLINK-12852
>                 URL: https://issues.apache.org/jira/browse/FLINK-12852
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.7.2, 1.8.1, 1.9.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to