Hi Rainie,
the network buffer pool was destroyed for some reason. This happens when
the NettyShuffleEnvironment gets closed which is triggered when an operator
is cleaned up, for instance. Maybe, the timeout in the metric system caused
this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
conversation hoping that he can give more insights.

If I may ask: What Flink version are you using?

Thanks,
Matthias


On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com> wrote:

> Hi All,
>
> Our flink application kept restarting and it did lots of RPC calls to a
> dependency service.
>
> *We saw this exception from failed task manager log: *
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 23 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ... 32 more
>
> *We also saw this exception from Job manager log:*
> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>                  - Association to [akka.tcp://flink-metrics@host:38593]
> with UID [-1261564990] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> This app has been running fine for a month.
> Any suggestion what could cause the issue? Any suggestions on how to debug
> it?
> Appreciated all advice.
>
> Thanks
> Best regards
> Rainie
>

Reply via email to