[ 
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837974#comment-15837974
 ] 

ASF GitHub Bot commented on FLINK-5638:
---------------------------------------

Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/3210


> Deadlock when closing two chained async I/O operators
> -----------------------------------------------------
>
>                 Key: FLINK-5638
>                 URL: https://issues.apache.org/jira/browse/FLINK-5638
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
> chained {{AsyncWaitOperator}} while there is still one element between these 
> two operators in flight.
> The deadlock scenario is the following: Given two chained 
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
> from the queue and output it to {{a2}}. This poll and output operation 
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the 
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In 
> this function, we try to add the new element to the {{StreamElementQueue}}. 
> Now assume that this queue is full. Then the operation will release the 
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
> have consumed all input. The close operation also happens under the 
> checkpoint lock. First the close method waits until all elements from the 
> {{StreamElementQueue}} have been processed (== empty). This happens by 
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on 
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. 
> Since the closing operation does not release the checkpoint lock, {{e1}} 
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
> the queue is empty. This is usually the case if the output operation is 
> atomic. However in the chained case it can happen that the emitter thread has 
> to wait to insert the element into the queue of the next 
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint 
> lock and, thus, the output operation is no longer atomic. We can solve this 
> problem by polling the last queue element after we have outputted it instead 
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not 
> freeing it again. Under these circumstances, the interrupt signal is 
> meaningless because the emitter thread also needs control over the checkpoint 
> lock. We should solve the problem by waiting on the checkpoint lock and 
> periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to