Hi David,
I want to clarify two things firstly based on the info you provided below.
1. If all the tasks are running on the same TaskManager, it would be no
credit-based flow control. The downstream operator consumes the upstream's data
in memory directly, no need network shuffle.
2. If the TaskManager has available buffers, that does not mean the internal
task must have available buffers on input or output sides. E.g for the output
side of "enrich-events" operator, it has
10 buffers in maximum. After these buffers are exhausted the operator would be
blocked no matter with available buffers on TaskManager level.
Considering your case, could you double check whether there are buffers
accumulated in output ("outputQueueLength" metric) of "enrich-events" operator
and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is
more than 0? I want to get ride of the factors of buffer leak on upstream side
and without partition request on downstream side. Then we can further allocate
whether
the input availability notification on downstream side has bugs to make it
stuck forever.
Best,
Zhijiang
--
From:David Maddison
Send Time:2020年6月9日(星期二) 19:28
To:user
Subject:Blocked requesting MemorySegment when Segments are available.
Hi,
I keep seeing the following situation where a task is blocked getting a
MemorySegment from the pool but the TaskManager is reporting that it has lots
of MemorySegments available.
I'm completely stumped as to how to debug or what to look at next so any
hints/help/advice would be greatly appreciated!
/David/
The situation is as follows (Flink 1.10.0):
I have two operations, the first one "enrich-events" is stuck forever
attempting to get a memory segment to send to downstream operator "Test
function":
"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xd2206000> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
All the operator tasks are running on the same TaskManager and the TaskManager
reports that it has 6,517 memory segments available, so it's confusing why the
task would be blocked getting a memory segment.
Memory Segments
Type Count
Available 6,517
Total 6,553
Even more confusing is that the downstream task appears to be waiting for data
and therefore I would assume that the credit based flow control isn't causing
the back pressure.
"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c
waiting on condition [0x7f644abf]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xc91de1d0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.