Hi Marco,

could you share your full task manager and job manager log? We
double-checked and saw that the buffer pool is only released on
cancellation or shutdown.

So I'm assuming that there is another issue (e.g., Kafka cluster not
reachable) and there is a race condition while shutting down. It seems like
the buffer pool exception is shadowing the actual cause then for yet
unknown reasons (this is an issue on its own, but you should be able to see
the actual issue in task manager log).

Best,

Arvid

On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <mvillalo...@kineteque.com>
wrote:

> Actually, the log I sent in my previous message, shows the only error that
> occurred before the buffer pool was destroyed. That intermittent warning:
>
> 2021-01-26 04:14:33,140 WARN
>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint
> offsets are available. This does not compromise Flink's checkpoint
> integrity.
> 2021-01-26 04:14:33,143 INFO  org.apache.kafka.clients.FetchSessionHandler
>                 [] - [Consumer clientId=consumer-luminai-2,
> groupId=luminai] Error sending fetch request (sessionId=936633685, epoch=1)
> to node 2: {}.
> org.apache.kafka.common.errors.DisconnectException: null
>
> I know that probably doesn't help much. Sorry.
>
> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Marco,
>>
>> the network buffer pool is destroyed when the task manager is shutdown.
>> Could you check if you have an error before that in your log?
>>
>> It seems like the timer is triggered at a point where it shouldn't. I'll
>> check if there is a known issue that has been fixed in later versions. Do
>> you have the option to upgrade to 1.11.3?
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> Hi.  What causes a buffer pool exception? How can I mitigate it? It is
>>> causing us plenty of problems right now.
>>>
>>> 2021-01-26 04:14:33,041 INFO
>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>> Subtask 1 received completion notification for checkpoint with id=4.
>>> 2021-01-26 04:14:33,140 WARN
>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>> Skipping commit of previous offsets because newer complete checkpoint
>>> offsets are available. This does not compromise Flink's checkpoint
>>> integrity.
>>> 2021-01-26 04:14:33,143 INFO
>>>  org.apache.kafka.clients.FetchSessionHandler                 [] -
>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>> org.apache.kafka.common.errors.DisconnectException: null
>>> 2021-01-26 04:14:33,146 INFO
>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>> Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1).
>>>
>>> THEN FINALLY
>>>
>>> ERROR
>>> ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction
>>> [] - Error in timer.
>>> java.lang.RuntimeException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> mypackage.MyOperator.collect(MyOperator.java:452)
>>> ~[develop-17e9fd0e.jar:?]
>>> at mypackage.MyOperator.onTimer(MyOperator.java:277)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> [develop-17e9fd0e.jar:?]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> [develop-17e9fd0e.jar:?]
>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>>> ~[develop-17e9fd0e.jar:?]
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>
>>

Reply via email to