Also could you please provide the jobmanager log? It could also be that the underlying failure is somewhere else.
On Thu, Jan 28, 2021 at 10:17 AM Arvid Heise <ar...@apache.org> wrote: > Hi Marco, > > In general, sending a compressed log to ML is totally fine. You can > further minimize the log by disabling restarts. > I looked into the logs that you provided. > > 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Attempting to cancel task forward fill -> (Sink: tag >> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2) >> (8c1f256176fb89f112c27883350a02bc). >> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >> [] - forward fill -> (Sink: tag db sink, Sink: back fill >> db sink, Sink: min max step db sink) (2/2) >> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING. >> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Triggering cancellation of task code forward fill -> >> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink) >> (2/2) (8c1f256176fb89f112c27883350a02bc). >> 2021-01-26 04:37:43,282 ERROR >> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction [] >> - Error in timer. >> java.lang.RuntimeException: Buffer pool is destroyed. >> > > I can see that my suspicion is most likely correct: It first tries to > cancel the task for some reason and then a later timer will show you the > respective error. I created the ticket to resolve the issue [1]. There may > also be an issue of swalled interruption exceptions, which we are looking > into in parallel. > > However, there is a reason why the task is canceling in the first place > and we need to find that. I recommend to not have a try-catch block around > *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it > around your user code but not around system calls. This may swallow the > real cause. > > Are you executing the code in IDE? You may be able to set some breakpoints > to quickly figure out what's going wrong (I can help then). > > [1] https://issues.apache.org/jira/browse/FLINK-21181 > > On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Marco, >> >> could you share your full task manager and job manager log? We >> double-checked and saw that the buffer pool is only released on >> cancellation or shutdown. >> >> So I'm assuming that there is another issue (e.g., Kafka cluster not >> reachable) and there is a race condition while shutting down. It seems like >> the buffer pool exception is shadowing the actual cause then for yet >> unknown reasons (this is an issue on its own, but you should be able to see >> the actual issue in task manager log). >> >> Best, >> >> Arvid >> >> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos < >> mvillalo...@kineteque.com> wrote: >> >>> Actually, the log I sent in my previous message, shows the only error >>> that occurred before the buffer pool was destroyed. That >>> intermittent warning: >>> >>> 2021-01-26 04:14:33,140 WARN >>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - >>> Committing offsets to Kafka takes longer than the checkpoint interval. >>> Skipping commit of previous offsets because newer complete checkpoint >>> offsets are available. This does not compromise Flink's checkpoint >>> integrity. >>> 2021-01-26 04:14:33,143 INFO >>> org.apache.kafka.clients.FetchSessionHandler [] - >>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch >>> request (sessionId=936633685, epoch=1) to node 2: {}. >>> org.apache.kafka.common.errors.DisconnectException: null >>> >>> I know that probably doesn't help much. Sorry. >>> >>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Marco, >>>> >>>> the network buffer pool is destroyed when the task manager is shutdown. >>>> Could you check if you have an error before that in your log? >>>> >>>> It seems like the timer is triggered at a point where it shouldn't. >>>> I'll check if there is a known issue that has been fixed in later versions. >>>> Do you have the option to upgrade to 1.11.3? >>>> >>>> Best, >>>> >>>> Arvid >>>> >>>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos < >>>> mvillalo...@kineteque.com> wrote: >>>> >>>>> Hi. What causes a buffer pool exception? How can I mitigate it? It is >>>>> causing us plenty of problems right now. >>>>> >>>>> 2021-01-26 04:14:33,041 INFO >>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - >>>>> Subtask 1 received completion notification for checkpoint with id=4. >>>>> 2021-01-26 04:14:33,140 WARN >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - >>>>> Committing offsets to Kafka takes longer than the checkpoint interval. >>>>> Skipping commit of previous offsets because newer complete checkpoint >>>>> offsets are available. This does not compromise Flink's checkpoint >>>>> integrity. >>>>> 2021-01-26 04:14:33,143 INFO >>>>> org.apache.kafka.clients.FetchSessionHandler [] - >>>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending >>>>> fetch >>>>> request (sessionId=936633685, epoch=1) to node 2: {}. >>>>> org.apache.kafka.common.errors.DisconnectException: null >>>>> 2021-01-26 04:14:33,146 INFO >>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - >>>>> Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1). >>>>> >>>>> THEN FINALLY >>>>> >>>>> ERROR >>>>> ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction >>>>> [] - Error in timer. >>>>> java.lang.RuntimeException: Buffer pool is destroyed. >>>>> at >>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> mypackage.MyOperator.collect(MyOperator.java:452) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at mypackage.MyOperator.onTimer(MyOperator.java:277) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) >>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>>> [develop-17e9fd0e.jar:?] >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>>> [develop-17e9fd0e.jar:?] >>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] >>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. >>>>> at >>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) >>>>> ~[develop-17e9fd0e.jar:?] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) >>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>> >>>>