[ https://issues.apache.org/jira/browse/FLINK-25441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466466#comment-17466466 ]
Piotr Nowojski edited comment on FLINK-25441 at 12/29/21, 2:03 PM: ------------------------------------------------------------------- [~wanglijie95], do you mean that [exactly this change|https://github.com/apache/flink/pull/17253/commits/43f4db7ade41eb0d5e052dcc81748d71740d540f#diff-b84174e55cb1999d99ad60cdeded7be20ff4978472bfc785c5a77b6270f47b56R799-R801] is causing the problems? It sounds to me like the logic there is correct. `ProducerFailedException` should be handled as `CancelTaskException`, as the downstream task is not the primary reason behind this failure. In principle I think > ProducerFailedException will cause task status switch from RUNNING to CANCELED is the correct thing to do. A better question would be why this error was not propagated up on the upstream task? It should be the upstream task's switch from `RUNNING` to `FAILED` trigger the job failover by JM. In other words, I think the bug here is that the "java.util.concurrent.TimeoutException: Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'." was not propagated on the upstream task. WDYT [~kevin.cyj]? was (Author: pnowojski): [~wanglijie95], do you mean that [exactly this change|https://github.com/apache/flink/pull/17253/commits/43f4db7ade41eb0d5e052dcc81748d71740d540f#diff-b84174e55cb1999d99ad60cdeded7be20ff4978472bfc785c5a77b6270f47b56R799-R801] is causing the problems? It sounds to me like the logic there is correct. `ProducerFailedException` should be handled as `CancelTaskException`, as the downstream task is not the primary reason behind this failure. In principle I think > ProducerFailedException will cause task status switch from RUNNING to > CANCELED, which will cause the job to hang. is the correct thing to do. A better question would be why this error was not propagated up on the upstream task? It should be the upstream task's switch from `RUNNING` to `FAILED` trigger the job failover by JM. In other words, I think the bug here is that the "java.util.concurrent.TimeoutException: Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'." was not propagated on the upstream task. WDYT [~kevin.cyj]? > ProducerFailedException will cause task status switch from RUNNING to > CANCELED, which will cause the job to hang. > ----------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25441 > URL: https://issues.apache.org/jira/browse/FLINK-25441 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.15.0 > Reporter: Lijie Wang > Priority: Major > > The {{ProducerFailedException}} extends {{{}CancelTaskException{}}}, which > will cause the task status switched from RUNNING to CANCELED. As described in > FLINK-17726, if a task is directly CANCELED by TaskManager due to its own > runtime issue, the task will not be recovered by JM and thus the job would > hang. > Note that it will not cause problems before FLINK-24182 (it unifies the > failureCause handling, changes the check of CancelTaskException from > "{{instanceof CancelTaskException}}" to "{{ExceptionUtils.findThrowable}}"), > because the {{ProducerFailedException}} is always wrapped by > {{{}RemoteTransportException{}}}. > The example log is as follows: > {code:java} > 2021-12-23 21:20:14,965 DEBUG org.apache.flink.runtime.taskmanager.Task > [] - MultipleInput[945] [Source: > HiveSource-tpcds_bin_orc_10000.catalog_sales, Source: > HiveSource-tpcds_bin_orc_10000.store_sales, Source: > HiveSource-tpcds_bin_orc_10000.catalog_sales, Source: > HiveSource-tpcds_bin_orc_10000.store_sales, Source: > HiveSource-tpcds_bin_orc_10000.store_sales, Source: > HiveSource-tpcds_bin_orc_10000.item, Source: > HiveSource-tpcds_bin_orc_10000.web_sales, Source: > HiveSource-tpcds_bin_orc_10000.web_sales] -> Calc[885] (143/1024)#0 > (8a883116ab601dd5b9ad5d2717d18918) switched from RUNNING to CANCELED due to > CancelTaskException: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Error at remote task manager > 'k28b09250.eu95sqa.tbsite.net/100.69.96.154:47459'. > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:301) > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:190) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(Thread.java:834) > Caused by: > org.apache.flink.runtime.io.network.partition.ProducerFailedException: > java.util.concurrent.TimeoutException: Buffer request timeout, this means > there is a fierce contention of the batch shuffle read memory, please > increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:234) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:91) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(Thread.java:756) > Caused by: java.util.concurrent.TimeoutException: Buffer request timeout, > this means there is a fierce contention of the batch shuffle read memory, > please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'. > at > org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.allocateBuffers(SortMergeResultPartitionReadScheduler.java:168) > at > org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.run(SortMergeResultPartitionReadScheduler.java:139) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)