>
> Regarding the try catch block

Sorry I meant the try catch in SensorMessageToSensorTimeSeriesFunction.

Also, just to be clear, does disabling restart make it easier for you to
> debug?
>
Yes the log will be quite small then. Currently, it's just repeating the
same things a couple of times.

Btw if you also have a second taskmanager, that log would be even more
interesting. So best to attach all logs (JM + TMs).


On Thu, Jan 28, 2021 at 4:24 PM Marco Villalobos <mvillalo...@kineteque.com>
wrote:

> Regarding the try catch block, it rethrows the exception.  Here is the
> code:
>
> catch (RuntimeException e) {
> logger.error("Error in timer.", e);
> throw e;
> }
>
> That would be okay, right?
>
> Also, just to be clear, does disabling restart make it easier for you to
> debug?
>
> On Thu, Jan 28, 2021 at 1:17 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Marco,
>>
>> In general, sending a compressed log to ML is totally fine. You can
>> further minimize the log by disabling restarts.
>> I looked into the logs that you provided.
>>
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                  [] - Attempting to cancel task forward fill -> (Sink: tag
>>> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
>>> (8c1f256176fb89f112c27883350a02bc).
>>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                    [] - forward fill -> (Sink: tag db sink, Sink: back fill
>>> db sink, Sink: min max step db sink) (2/2)
>>> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
>>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                    [] - Triggering cancellation of task code forward fill
>>> -> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
>>> (2/2) (8c1f256176fb89f112c27883350a02bc).
>>> 2021-01-26 04:37:43,282 ERROR
>>> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
>>> - Error in timer.
>>> java.lang.RuntimeException: Buffer pool is destroyed.
>>>
>>
>> I can see that my suspicion is most likely correct: It first tries to
>> cancel the task for some reason and then a later timer will show you the
>> respective error. I created the ticket to resolve the issue [1]. There may
>> also be an issue of swalled interruption exceptions, which we are looking
>> into in parallel.
>>
>> However, there is a reason why the task is canceling in the first place
>> and we need to find that. I recommend to not have a try-catch block around
>> *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
>> around your user code but not around system calls. This may swallow the
>> real cause.
>>
>> Are you executing the code in IDE? You may be able to set some
>> breakpoints to quickly figure out what's going wrong (I can help then).
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21181
>>
>> On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Marco,
>>>
>>> could you share your full task manager and job manager log? We
>>> double-checked and saw that the buffer pool is only released on
>>> cancellation or shutdown.
>>>
>>> So I'm assuming that there is another issue (e.g., Kafka cluster not
>>> reachable) and there is a race condition while shutting down. It seems like
>>> the buffer pool exception is shadowing the actual cause then for yet
>>> unknown reasons (this is an issue on its own, but you should be able to see
>>> the actual issue in task manager log).
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
>>> mvillalo...@kineteque.com> wrote:
>>>
>>>> Actually, the log I sent in my previous message, shows the only error
>>>> that occurred before the buffer pool was destroyed. That
>>>> intermittent warning:
>>>>
>>>> 2021-01-26 04:14:33,140 WARN
>>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>>> Skipping commit of previous offsets because newer complete checkpoint
>>>> offsets are available. This does not compromise Flink's checkpoint
>>>> integrity.
>>>> 2021-01-26 04:14:33,143 INFO
>>>>  org.apache.kafka.clients.FetchSessionHandler                 [] -
>>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>
>>>> I know that probably doesn't help much. Sorry.
>>>>
>>>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> the network buffer pool is destroyed when the task manager is
>>>>> shutdown. Could you check if you have an error before that in your log?
>>>>>
>>>>> It seems like the timer is triggered at a point where it shouldn't.
>>>>> I'll check if there is a known issue that has been fixed in later 
>>>>> versions.
>>>>> Do you have the option to upgrade to 1.11.3?
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
>>>>> mvillalo...@kineteque.com> wrote:
>>>>>
>>>>>> Hi.  What causes a buffer pool exception? How can I mitigate it? It
>>>>>> is causing us plenty of problems right now.
>>>>>>
>>>>>> 2021-01-26 04:14:33,041 INFO
>>>>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>>>>> Subtask 1 received completion notification for checkpoint with id=4.
>>>>>> 2021-01-26 04:14:33,140 WARN
>>>>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>>>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>>>>> Skipping commit of previous offsets because newer complete checkpoint
>>>>>> offsets are available. This does not compromise Flink's checkpoint
>>>>>> integrity.
>>>>>> 2021-01-26 04:14:33,143 INFO
>>>>>>  org.apache.kafka.clients.FetchSessionHandler                 [] -
>>>>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending 
>>>>>> fetch
>>>>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>>> 2021-01-26 04:14:33,146 INFO
>>>>>>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>>>>>> Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1).
>>>>>>
>>>>>> THEN FINALLY
>>>>>>
>>>>>> ERROR
>>>>>> ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction
>>>>>> [] - Error in timer.
>>>>>> java.lang.RuntimeException: Buffer pool is destroyed.
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> mypackage.MyOperator.collect(MyOperator.java:452)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at mypackage.MyOperator.onTimer(MyOperator.java:277)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>>> [develop-17e9fd0e.jar:?]
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>>> [develop-17e9fd0e.jar:?]
>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>>>>>> ~[develop-17e9fd0e.jar:?]
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>>>>>
>>>>>

Reply via email to