Hi Marco,

In general, sending a compressed log to ML is totally fine. You can further
minimize the log by disabling restarts.
I looked into the logs that you provided.

2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>                [] - Attempting to cancel task forward fill -> (Sink: tag db
> sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>                  [] - forward fill -> (Sink: tag db sink, Sink: back fill
> db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>                  [] - Triggering cancellation of task code forward fill ->
> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
> (2/2) (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,282 ERROR
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
> - Error in timer.
> java.lang.RuntimeException: Buffer pool is destroyed.
>

I can see that my suspicion is most likely correct: It first tries to
cancel the task for some reason and then a later timer will show you the
respective error. I created the ticket to resolve the issue [1]. There may
also be an issue of swalled interruption exceptions, which we are looking
into in parallel.

However, there is a reason why the task is canceling in the first place and
we need to find that. I recommend to not have a try-catch block around
*collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
around your user code but not around system calls. This may swallow the
real cause.

Are you executing the code in IDE? You may be able to set some breakpoints
to quickly figure out what's going wrong (I can help then).

[1] https://issues.apache.org/jira/browse/FLINK-21181

On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise <[email protected]> wrote:

> Hi Marco,
>
> could you share your full task manager and job manager log? We
> double-checked and saw that the buffer pool is only released on
> cancellation or shutdown.
>
> So I'm assuming that there is another issue (e.g., Kafka cluster not
> reachable) and there is a race condition while shutting down. It seems like
> the buffer pool exception is shadowing the actual cause then for yet
> unknown reasons (this is an issue on its own, but you should be able to see
> the actual issue in task manager log).
>
> Best,
>
> Arvid
>
> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
> [email protected]> wrote:
>
>> Actually, the log I sent in my previous message, shows the only error
>> that occurred before the buffer pool was destroyed. That
>> intermittent warning:
>>
>> 2021-01-26 04:14:33,140 WARN
>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>> Committing offsets to Kafka takes longer than the checkpoint interval.
>> Skipping commit of previous offsets because newer complete checkpoint
>> offsets are available. This does not compromise Flink's checkpoint
>> integrity.
>> 2021-01-26 04:14:33,143 INFO
>>  org.apache.kafka.clients.FetchSessionHandler                 [] -
>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>> request (sessionId=936633685, epoch=1) to node 2: {}.
>> org.apache.kafka.common.errors.DisconnectException: null
>>
>> I know that probably doesn't help much. Sorry.
>>
>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <[email protected]> wrote:
>>
>>> Hi Marco,
>>>
>>> the network buffer pool is destroyed when the task manager is shutdown.
>>> Could you check if you have an error before that in your log?
>>>
>>> It seems like the timer is triggered at a point where it shouldn't. I'll
>>> check if there is a known issue that has been fixed in later versions. Do
>>> you have the option to upgrade to 1.11.3?
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
>>> [email protected]> wrote:
>>>
>>>> Hi.  What causes a buffer pool exception? How can I mitigate it? It is
>>>> causing us plenty of problems right now.
>>>>
>>>> 2021-01-26 04:14:33,041 INFO
>>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>>> Subtask 1 received completion notification for checkpoint with id=4.
>>>> 2021-01-26 04:14:33,140 WARN
>>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>>> Skipping commit of previous offsets because newer complete checkpoint
>>>> offsets are available. This does not compromise Flink's checkpoint
>>>> integrity.
>>>> 2021-01-26 04:14:33,143 INFO
>>>>  org.apache.kafka.clients.FetchSessionHandler                 [] -
>>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>> 2021-01-26 04:14:33,146 INFO
>>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>>> Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1).
>>>>
>>>> THEN FINALLY
>>>>
>>>> ERROR
>>>> ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction
>>>> [] - Error in timer.
>>>> java.lang.RuntimeException: Buffer pool is destroyed.
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> mypackage.MyOperator.collect(MyOperator.java:452)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at mypackage.MyOperator.onTimer(MyOperator.java:277)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>> [develop-17e9fd0e.jar:?]
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>> [develop-17e9fd0e.jar:?]
>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>>>> ~[develop-17e9fd0e.jar:?]
>>>> at
>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>
>>>

Reply via email to