[ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837807#comment-15837807 ]
ASF GitHub Bot commented on FLINK-5638: --------------------------------------- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3210 Agreed, the problem is a little bit that you don't control from where the `close` method is called... Thanks for the review @StephanEwen. Merging once Travis has passed. > Deadlock when closing two chained async I/O operators > ----------------------------------------------------- > > Key: FLINK-5638 > URL: https://issues.apache.org/jira/browse/FLINK-5638 > Project: Flink > Issue Type: Bug > Components: Local Runtime > Affects Versions: 1.2.0, 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.2.0, 1.3.0 > > > The {{AsyncWaitOperator}} can deadlock in a special cases when closing two > chained {{AsyncWaitOperator}} while there is still one element between these > two operators in flight. > The deadlock scenario is the following: Given two chained > {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element > completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element > from the queue and output it to {{a2}}. This poll and output operation > happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the > {{e1}} thread will directly call {{a2's}} {{processElement}} function. In > this function, we try to add the new element to the {{StreamElementQueue}}. > Now assume that this queue is full. Then the operation will release the > checkpoint lock and wait until it is notified again. > In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we > have consumed all input. The close operation also happens under the > checkpoint lock. First the close method waits until all elements from the > {{StreamElementQueue}} have been processed (== empty). This happens by > waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on > {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. > Since the closing operation does not release the checkpoint lock, {{e1}} > cannot regain the synchronization lock and voila we have a deadlock. > There are two problems which cause the problem: > 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if > the queue is empty. This is usually the case if the output operation is > atomic. However in the chained case it can happen that the emitter thread has > to wait to insert the element into the queue of the next > {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint > lock and, thus, the output operation is no longer atomic. We can solve this > problem by polling the last queue element after we have outputted it instead > of before. > 2. We interrupt the emitter thread while holding the checkpoint lock and not > freeing it again. Under these circumstances, the interrupt signal is > meaningless because the emitter thread also needs control over the checkpoint > lock. We should solve the problem by waiting on the checkpoint lock and > periodically checking whether the thread has already stopped or not. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)