[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533439#comment-16533439
 ] 

ASF GitHub Bot commented on FLINK-9676:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200287796
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
     
        @Override
        public void recycle(MemorySegment segment) {
    +           BufferListener listener;
                synchronized (availableMemorySegments) {
                        if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
                                returnMemorySegment(segment);
    +                           return;
                        }
                        else {
    -                           BufferListener listener = 
registeredListeners.poll();
    +                           listener = registeredListeners.poll();
     
                                if (listener == null) {
                                        availableMemorySegments.add(segment);
                                        availableMemorySegments.notify();
    +                                   return;
                                }
    -                           else {
    -                                   try {
    -                                           boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -                                           if (needMoreBuffers) {
    -                                                   
registeredListeners.add(listener);
    -                                           }
    +                   }
    +           }
    +
    +           // We do not know which locks have been acquired before the 
recycle() or are needed in the
    +           // notification and which other threads also access them.
    +           // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
    +           boolean success = false;
    +           boolean needMoreBuffers = false;
    +           try {
    +                   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
    +                   success = true;
    +           } catch (Throwable ignored) {
    +                   // handled below, under the lock
    +           }
    +
    +           if (!success || needMoreBuffers) {
    +                   synchronized (availableMemorySegments) {
    +                           if (isDestroyed) {
    +                                   // cleanup tasks how they would have 
been done if we only had one synchronized block
    +                                   if (needMoreBuffers) {
    +                                           
listener.notifyBufferDestroyed();
    --- End diff --
    
    šŸ‘


> Deadlock during canceling task and recycling exclusive buffer
> -------------------------------------------------------------
>
>                 Key: FLINK-9676
>                 URL: https://issues.apache.org/jira/browse/FLINK-9676
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: zhijiang
>            Assignee: Nico Kruber
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> ->Ā {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}}Ā >Ā {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution isĀ that {{listener#notifyBufferAvailable}}Ā can beĀ calledĀ outside 
> the {{synchronized(availableMemorySegments) inĀ }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to