Hi Felipe,

"Buffer pool is destroyed" is mainly caused by canceling task. That means there 
are other tasks failure which would trigger canceling all the topology tasks by 
job master.
So if you want to find the root cause, it is proper to check the job master log 
to find the first failure which triggers the following cancel operations.

In addition, which flink version are you using?

Best,
Zhijiang


------------------------------------------------------------------
From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Send Time:2019 Nov. 6 (Wed.) 19:12
To:user <user@flink.apache.org>
Subject:What metrics can I see the root cause of "Buffer pool is destroyed" 
message?

Hi community,

Looking at the code [1] it seems that it is related to not have 
availableMemorySegments anymore. I am looking at several metrics but it hasn't 
seemed to help me understand where I can measure the root cause of this error 
message.

- flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not seem 
to give me a related cause. 
- flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength I 
see my reducer operator always with queue lenght equal 4. Pre-aggregate task 
sometimes goes to 3 but it goes only few times.
- flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and 
flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength shows 
my source task several times in 100%. But my error message comes from the 
pre-aggregate task.
- flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond 
DOES show the the pre-aggregate task is consuming a lot. But with which metric 
can I relate this to know in advance how much is a lot?

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265

Thanks for your suggestions and here is my stack trace:

java.lang.RuntimeException: Buffer pool is destroyed.
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

Reply via email to