[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879173#comment-16879173
 ] 

Stephan Ewen commented on FLINK-12852:
--------------------------------------

So, in summary we have these options.

  1. Only allocate the minimum per producer, *which is one buffer per channel*. 
This would be needed to keep the requirement similar to what we have at the 
moment, but it is much less than we recommend for the credit-based network data 
exchange (2* channels + floating)
    ==> *Not a good option in my opinion*

  2. Coordinate the deployment sink-to-source such that receivers always have 
their buffers first. This will be complex to implement and coordinate and break 
with many assumptions about tasks being independent (coordination wise) on the 
TaskManagers. Giving that assumption up will be a pretty big step and cause 
lot's of complexity in the future.
      It will also increase deployment delays. Low deployment delays should be 
a design goal in my opinion, as it will enable other features more easily, like 
low-disruption upgrades, etc.
   ==> *Not a good option either, in my opinion*

  3. Make buffers always revokable, by spilling.
      This is tricky to implement very efficiently, especially because
        - there is the logic that slices buffers for early sends for the 
low-latency streaming stuff
        - the spilling request will come from an asynchronous call.  That will 
probably stay like that even with the mailbox, because the main thread will be 
frequently blocked on buffer allocation when this request comes.
   ==> I think this would explode in complexity and bugs, unless we rewrite 
other parts significantly. *Not a good option either*

  4. We allocate the recommended number for good throughput (2*numChannels + 
floating) per consumer and per producer.
      No dynamic rebalancing any more. This would increase the number of 
required network buffers in certain high-parallelism scenarios quite a bit with 
the default config. Users can down-configure this by setting the per-channel 
buffers lower. But it would break user setups and require them to adjust the 
config when upgrading.
    ==> *Not a great option, in my opinion*

  5. We make the network resource per slot and ask the scheduler to attach 
information about how many producers and how many consumers will be in the 
slot, worst case. We use that to pre-compute how many excess buffers the 
producers may take.
     This will also break with some assumptions and lead us to the point that 
we have to pre-compute network buffers in the same way as managed memory. 
Seeing how much pain it is with the managed memory, this seems not so great.
   ==> *Not a great option either*

This is tricky. At this point it looks like option (4) is the only one that is 
feasible without severe performance issues or an explosion of complexity.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
>                 Key: FLINK-12852
>                 URL: https://issues.apache.org/jira/browse/FLINK-12852
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.7.2, 1.8.1, 1.9.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to