Guokuai Huang created FLINK-22946:
-------------------------------------
Summary: Network buffer deadlock introduced by unaligned checkpoint
Key: FLINK-22946
URL: https://issues.apache.org/jira/browse/FLINK-22946
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.13.1, 1.13.0
Reporter: Guokuai Huang
Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot
2021-06-09 at 7.02.04 PM.png
We recently encountered deadlock when using unaligned checkpoint. Below are two
thread stacks that cause deadlock:
```
"Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer
Join(xxxxxx) (34/256)#1": at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
- waiting to lock <0x00000007296dfa90> (a
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
- locked <0x00000007296bc450> (a
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
Source) at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
Source) at
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
- waiting to lock <0x00000007296bc450> (a
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
- locked <0x00000007296dfa90> (a
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
Source) at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
Source) at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at
java.lang.Thread.run(Thread.java:745)
```
The root cause of this problem is that unaligned checkpoint makes it possible
that *under the same input gate,* *multiple input channels may recycle network
buffer at the same time.*
Previously, network buffer recycling would only occur serially between input
channels under the same input gate, because each sub-task is process Input data
serially, and an input gate belongs to only one sub-task. When unaligned
checkpoint is enabled, each input channel will take a snapshot of the input
channel when it receives the checkpoint barrier, and the network buffer may be
recycled in the process.
Unfortunately, *the current network buffer recovery mechanism does not take
into account the situation where multiple input channels perform network buffer
recovery at the same time.* The following code block is from that causes
deadlock when multiple input channels under same input gate perform network
buffer recycling at the same time.
!Screen Shot 2021-06-09 at 7.02.04 PM.png!
The solution to this problem is very straightforward. Here are two possible
solutions:
*1. Case by case solution.* Note that input channel A (locked A) gave the
released network buffer to input channel B (waiting to lock B), and input
channel B (locked B) gave the released network buffer to input channel A
(waiting to lock A) ), so when an input channel releases the network buffer,
first check whether it is also waiting for the network buffer, and if it is,
directly allocate it to itself, which can avoid the situation that different
input channels exchange network buffers.
2. *A straightforward solution.* Considering that the input channel occupies
the lock during recycle to remove the network buffer from the bufferQueue, the
subsequent operations do not need to hold this lock. Therefore, we only need to
place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)