[ 
https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guokuai Huang updated FLINK-22946:
----------------------------------
    Description: 
We recently encountered deadlock when using unaligned checkpoint. Below are two 
thread stacks that cause deadlock:


{code:java}
"Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer 
Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
 Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
java.lang.Thread.run(Thread.java:745){code}
The root cause of this problem is that unaligned checkpoint makes it possible 
that *under the same input gate,* *multiple input channels may recycle network 
buffer at the same time.*

Previously, network buffer recycling would only occur serially between input 
channels under the same input gate, because each sub-task is process Input data 
serially, and an input gate belongs to only one sub-task. When unaligned 
checkpoint is enabled, each input channel will take a snapshot of the input 
channel when it receives the checkpoint barrier, and the network buffer may be 
recycled in the process.

Unfortunately, *the current network buffer recovery mechanism does not take 
into account the situation where multiple input channels perform network buffer 
recovery at the same time.* The following code block is from that causes 
deadlock when multiple input channels under same input gate perform network 
buffer recycling at the same time.

!Screen Shot 2021-06-09 at 7.02.04 PM.png!

The solution to this problem is very straightforward. Here are two possible 
solutions:
 *1. Case by case solution.* Note that input channel A (locked A) gave the 
released network buffer to input channel B (waiting to lock B), and input 
channel B (locked B) gave the released network buffer to input channel A 
(waiting to lock A) ), so when an input channel releases the network buffer, 
first check whether it is also waiting for the network buffer, and if it is, 
directly allocate it to itself, which can avoid the situation that different 
input channels exchange network buffers.
 2. *A straightforward solution.* Considering that the input channel occupies 
the lock during recycle to remove the network buffer from the bufferQueue, the 
subsequent operations do not need to hold this lock. Therefore, we only need to 
place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.

  was:
We recently encountered deadlock when using unaligned checkpoint. Below are two 
thread stacks that cause deadlock:
```
"Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer 
Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
 Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
java.lang.Thread.run(Thread.java:745)
```

The root cause of this problem is that unaligned checkpoint makes it possible 
that *under the same input gate,* *multiple input channels may recycle network 
buffer at the same time.*

Previously, network buffer recycling would only occur serially between input 
channels under the same input gate, because each sub-task is process Input data 
serially, and an input gate belongs to only one sub-task. When unaligned 
checkpoint is enabled, each input channel will take a snapshot of the input 
channel when it receives the checkpoint barrier, and the network buffer may be 
recycled in the process.

Unfortunately, *the current network buffer recovery mechanism does not take 
into account the situation where multiple input channels perform network buffer 
recovery at the same time.* The following code block is from that causes 
deadlock when multiple input channels under same input gate perform network 
buffer recycling at the same time.

!Screen Shot 2021-06-09 at 7.02.04 PM.png!

The solution to this problem is very straightforward. Here are two possible 
solutions:
*1. Case by case solution.* Note that input channel A (locked A) gave the 
released network buffer to input channel B (waiting to lock B), and input 
channel B (locked B) gave the released network buffer to input channel A 
(waiting to lock A) ), so when an input channel releases the network buffer, 
first check whether it is also waiting for the network buffer, and if it is, 
directly allocate it to itself, which can avoid the situation that different 
input channels exchange network buffers.
2. *A straightforward solution.* Considering that the input channel occupies 
the lock during recycle to remove the network buffer from the bufferQueue, the 
subsequent operations do not need to hold this lock. Therefore, we only need to 
place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.


> Network buffer deadlock introduced by unaligned checkpoint
> ----------------------------------------------------------
>
>                 Key: FLINK-22946
>                 URL: https://issues.apache.org/jira/browse/FLINK-22946
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0, 1.13.1
>            Reporter: Guokuai Huang
>            Priority: Critical
>         Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 
> 2021-06-09 at 7.02.04 PM.png
>
>
> We recently encountered deadlock when using unaligned checkpoint. Below are 
> two thread stacks that cause deadlock:
> {code:java}
> "Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer 
> Join(xxxxxx) (34/256)#1": at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x00000007296dfa90> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
>  - locked <0x00000007296bc450> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
>  Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": 
> at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x00000007296bc450> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
>  - locked <0x00000007296dfa90> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
>  Source) at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
>  Source) at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
> java.lang.Thread.run(Thread.java:745){code}
> The root cause of this problem is that unaligned checkpoint makes it possible 
> that *under the same input gate,* *multiple input channels may recycle 
> network buffer at the same time.*
> Previously, network buffer recycling would only occur serially between input 
> channels under the same input gate, because each sub-task is process Input 
> data serially, and an input gate belongs to only one sub-task. When unaligned 
> checkpoint is enabled, each input channel will take a snapshot of the input 
> channel when it receives the checkpoint barrier, and the network buffer may 
> be recycled in the process.
> Unfortunately, *the current network buffer recovery mechanism does not take 
> into account the situation where multiple input channels perform network 
> buffer recovery at the same time.* The following code block is from that 
> causes deadlock when multiple input channels under same input gate perform 
> network buffer recycling at the same time.
> !Screen Shot 2021-06-09 at 7.02.04 PM.png!
> The solution to this problem is very straightforward. Here are two possible 
> solutions:
>  *1. Case by case solution.* Note that input channel A (locked A) gave the 
> released network buffer to input channel B (waiting to lock B), and input 
> channel B (locked B) gave the released network buffer to input channel A 
> (waiting to lock A) ), so when an input channel releases the network buffer, 
> first check whether it is also waiting for the network buffer, and if it is, 
> directly allocate it to itself, which can avoid the situation that different 
> input channels exchange network buffers.
>  2. *A straightforward solution.* Considering that the input channel occupies 
> the lock during recycle to remove the network buffer from the bufferQueue, 
> the subsequent operations do not need to hold this lock. Therefore, we only 
> need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to