[ 
https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guokuai Huang updated FLINK-22946:
----------------------------------
    Labels: pull-request-available  (was: stale-blocker)

> Network buffer deadlock introduced by unaligned checkpoint
> ----------------------------------------------------------
>
>                 Key: FLINK-22946
>                 URL: https://issues.apache.org/jira/browse/FLINK-22946
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0, 1.13.1
>            Reporter: Guokuai Huang
>            Assignee: Guokuai Huang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.14.0, 1.13.2
>
>         Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 
> 2021-06-09 at 7.02.04 PM.png
>
>
> We recently encountered deadlock when using unaligned checkpoint. Below are 
> two thread stacks that cause deadlock:
> {code:java}
> "Channel state writer Join(xxxxxx) (34/256)#1": at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x00000007296dfa90> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
>  - locked <0x00000007296bc450> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
>  Source) at java.lang.Thread.run(Thread.java:745){code}
> {code:java}
> "Join(xxxxxx) (34/256)#1": at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x00000007296bc450> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
>  - locked <0x00000007296dfa90> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
>  Source) at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
>  Source) at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
> java.lang.Thread.run(Thread.java:745){code}
> The root cause of this problem is that unaligned checkpoint makes it possible 
> that *under the same input gate,* *multiple input channels may recycle 
> network buffer at the same time.*
> Previously, network buffer recycling would only occur serially between input 
> channels under the same input gate, because each sub-task is process Input 
> data serially, and an input gate belongs to only one sub-task. When unaligned 
> checkpoint is enabled, each input channel will take a snapshot of the input 
> channel when it receives the checkpoint barrier, and the network buffer may 
> be recycled in the process.
> Unfortunately, *the current network buffer recycling mechanism does not take 
> into account the situation where multiple input channels perform network 
> buffer recycling at the same time.* The following code block is from 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue
>  that causes deadlock when multiple input channels under same input gate 
> perform network buffer recycling at the same time.
> !Screen Shot 2021-06-09 at 7.02.04 PM.png!
> The solution to this problem is quite straightforward. Here are two possible 
> solutions:
>  *1. Case by case solution.* Note that input channel A (locked A) gave the 
> released network buffer to input channel B (waiting to lock B), and input 
> channel B (locked B) gave the released network buffer to input channel A 
> (waiting to lock A) ), so when an input channel releases the network buffer, 
> first check whether it is also waiting for the network buffer, and if it is, 
> directly allocate it to itself, which can avoid the situation that different 
> input channels exchange network buffers.
>  2. *A straightforward solution.* Considering that the input channel occupies 
> the lock during recycle to remove the network buffer from the bufferQueue, 
> the subsequent operations do not need to hold this lock. Therefore, we only 
> need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to