Hi Matthias,

Do you have any suggestions to handle timeout issues when fetching data
from a Kafka topic?
I am thinking of adding a retry logic into flink job, not sure if this is
the right direction.

Thanks again
Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <raini...@pinterest.com> wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
>> ... 2 more
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
>> expired while fetching topic metadata
>>
>>
>> Thanks again!
>> Best regards
>> Rainie
>>
>> On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> Another question is: The timeout of 48 hours sounds strange. There
>>> should have been some other system noticing the connection problem earlier
>>> assuming that you have a reasonably low heartbeat interval configured.
>>>
>>> Matthias
>>>
>>> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> Thanks for providing this information, Rainie. Are other issues
>>>> documented in the logs besides the TimeoutException in the JM logs which
>>>> you already shared? For now, it looks like that there was a connection
>>>> problem between the TaskManager and the JobManager that caused the shutdown
>>>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>>>> this scenario I would expect other failures to occur besides the ones you
>>>> shared.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <raini...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Thank you Mattias.
>>>>> It’s version1.9.
>>>>>
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <matth...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Rainie,
>>>>>> the network buffer pool was destroyed for some reason. This happens
>>>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>>>
>>>>>> If I may ask: What Flink version are you using?
>>>>>>
>>>>>> Thanks,
>>>>>> Matthias
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> Our flink application kept restarting and it did lots of RPC calls
>>>>>>> to a dependency service.
>>>>>>>
>>>>>>> *We saw this exception from failed task manager log: *
>>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>>> Could not forward element to next operator
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>>>>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>>>>>> at
>>>>>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> ... 23 more
>>>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>>>>>> ... 32 more
>>>>>>>
>>>>>>> *We also saw this exception from Job manager log:*
>>>>>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>>>>>                        - Association to 
>>>>>>> [akka.tcp://flink-metrics@host:38593]
>>>>>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>>>> for too long. (more than 48.0 hours)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>> at
>>>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at
>>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> This app has been running fine for a month.
>>>>>>> Any suggestion what could cause the issue? Any suggestions on how to
>>>>>>> debug it?
>>>>>>> Appreciated all advice.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best regards
>>>>>>> Rainie
>>>>>>>
>>>>>>

Reply via email to