[ https://issues.apache.org/jira/browse/FLINK-19983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228333#comment-17228333 ]
Yingjie Cao commented on FLINK-19983: ------------------------------------- After some investigation, I find that the state check is not true, we just need to remove it. The caller, including CreditBasedSequenceNumberingViewReader and LocalInputChannel can handle the case correctly. BoundedBlockingSubpartitionReader does the same thing. The PR is available for review now. > ShuffleCompressionITCase.testDataCompressionForSortMergeBlockingShuffle > unstable > -------------------------------------------------------------------------------- > > Key: FLINK-19983 > URL: https://issues.apache.org/jira/browse/FLINK-19983 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.12.0 > Reporter: Robert Metzger > Assignee: Yingjie Cao > Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8997&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=f508e270-48d6-5f1e-3138-42a17e0714f0 > {code} > 2020-11-04T14:32:19.7227316Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 16.882 s <<< FAILURE! - in > org.apache.flink.test.runtime.ShuffleCompressionITCase > 2020-11-04T14:32:19.7228708Z [ERROR] > testDataCompressionForSortMergeBlockingShuffle[useBroadcastPartitioner = > true](org.apache.flink.test.runtime.ShuffleCompressionITCase) Time elapsed: > 5.058 s <<< FAILURE! > 2020-11-04T14:32:19.7230032Z java.lang.AssertionError: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > 2020-11-04T14:32:19.7230580Z at > org.apache.flink.test.runtime.JobGraphRunningUtil.execute(JobGraphRunningUtil.java:58) > 2020-11-04T14:32:19.7231173Z at > org.apache.flink.test.runtime.ShuffleCompressionITCase.testDataCompressionForSortMergeBlockingShuffle(ShuffleCompressionITCase.java:98) > 2020-11-04T14:32:19.7232076Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-11-04T14:32:19.7232624Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-11-04T14:32:19.7233242Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-11-04T14:32:19.7233741Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-11-04T14:32:19.7234353Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-11-04T14:32:19.7235141Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-11-04T14:32:19.7238521Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-11-04T14:32:19.7239371Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-11-04T14:32:19.7240010Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-11-04T14:32:19.7240688Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-11-04T14:32:19.7241396Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-11-04T14:32:19.7242019Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-11-04T14:32:19.7242623Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-11-04T14:32:19.7243379Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-11-04T14:32:19.7244051Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-11-04T14:32:19.7244631Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-11-04T14:32:19.7245313Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2020-11-04T14:32:19.7245844Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2020-11-04T14:32:19.7246341Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2020-11-04T14:32:19.7246868Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-11-04T14:32:19.7247616Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-11-04T14:32:19.7248223Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-11-04T14:32:19.7248826Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-11-04T14:32:19.7249393Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-11-04T14:32:19.7249963Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2020-11-04T14:32:19.7250586Z at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > 2020-11-04T14:32:19.7251277Z at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > 2020-11-04T14:32:19.7252024Z at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > 2020-11-04T14:32:19.7252839Z at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > 2020-11-04T14:32:19.7253584Z at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > 2020-11-04T14:32:19.7254365Z at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > 2020-11-04T14:32:19.7255165Z at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > 2020-11-04T14:32:19.7255806Z at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > 2020-11-04T14:32:19.7256609Z Caused by: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > 2020-11-04T14:32:19.7257950Z at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > 2020-11-04T14:32:19.7258918Z at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > 2020-11-04T14:32:19.7259799Z at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) > 2020-11-04T14:32:19.7260745Z at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) > 2020-11-04T14:32:19.7261565Z at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) > 2020-11-04T14:32:19.7262363Z at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534) > 2020-11-04T14:32:19.7263192Z at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) > 2020-11-04T14:32:19.7263936Z at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419) > 2020-11-04T14:32:19.7264543Z at > sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > 2020-11-04T14:32:19.7265215Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-11-04T14:32:19.7265827Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-11-04T14:32:19.7266473Z at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) > 2020-11-04T14:32:19.7267334Z at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) > 2020-11-04T14:32:19.7268117Z at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > 2020-11-04T14:32:19.7268898Z at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > 2020-11-04T14:32:19.7269511Z at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > 2020-11-04T14:32:19.7270106Z at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > 2020-11-04T14:32:19.7270708Z at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > 2020-11-04T14:32:19.7271257Z at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > 2020-11-04T14:32:19.7271812Z at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > 2020-11-04T14:32:19.7272376Z at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 2020-11-04T14:32:19.7273048Z at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 2020-11-04T14:32:19.7273583Z at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > 2020-11-04T14:32:19.7274140Z at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > 2020-11-04T14:32:19.7274679Z at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > 2020-11-04T14:32:19.7275310Z at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > 2020-11-04T14:32:19.7275792Z at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > 2020-11-04T14:32:19.7276343Z at akka.dispatch.Mailbox.run(Mailbox.scala:225) > 2020-11-04T14:32:19.7276864Z at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > 2020-11-04T14:32:19.7277531Z at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > 2020-11-04T14:32:19.7278195Z at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > 2020-11-04T14:32:19.7278856Z at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > 2020-11-04T14:32:19.7279542Z at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2020-11-04T14:32:19.7281428Z Caused by: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Fatal error at remote task manager 'localhost/127.0.0.1:33368'. > 2020-11-04T14:32:19.7282544Z at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:283) > 2020-11-04T14:32:19.7283723Z at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:183) > 2020-11-04T14:32:19.7285215Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > 2020-11-04T14:32:19.7286265Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > 2020-11-04T14:32:19.7287520Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > 2020-11-04T14:32:19.7288686Z at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:115) > 2020-11-04T14:32:19.7289722Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > 2020-11-04T14:32:19.7290742Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > 2020-11-04T14:32:19.7291776Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > 2020-11-04T14:32:19.7292884Z at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > 2020-11-04T14:32:19.7293919Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > 2020-11-04T14:32:19.7295079Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > 2020-11-04T14:32:19.7296076Z at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > 2020-11-04T14:32:19.7297214Z at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) > 2020-11-04T14:32:19.7298245Z at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) > 2020-11-04T14:32:19.7299068Z at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) > 2020-11-04T14:32:19.7299963Z at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > 2020-11-04T14:32:19.7300879Z at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > 2020-11-04T14:32:19.7301516Z at java.lang.Thread.run(Thread.java:748) > 2020-11-04T14:32:19.7302033Z Caused by: java.io.IOException: Reader is > already released. > 2020-11-04T14:32:19.7302880Z at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:255) > 2020-11-04T14:32:19.7303825Z at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:108) > 2020-11-04T14:32:19.7304871Z at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:173) > 2020-11-04T14:32:19.7305873Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > 2020-11-04T14:32:19.7306932Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > 2020-11-04T14:32:19.7308179Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > 2020-11-04T14:32:19.7309255Z at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > 2020-11-04T14:32:19.7310249Z at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > 2020-11-04T14:32:19.7311499Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > 2020-11-04T14:32:19.7312573Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > 2020-11-04T14:32:19.7313711Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > 2020-11-04T14:32:19.7314967Z at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428) > 2020-11-04T14:32:19.7316038Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > 2020-11-04T14:32:19.7317216Z at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > 2020-11-04T14:32:19.7318253Z at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913) > 2020-11-04T14:32:19.7319209Z at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:87) > 2020-11-04T14:32:19.7320177Z at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > 2020-11-04T14:32:19.7321167Z at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > 2020-11-04T14:32:19.7322060Z at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > 2020-11-04T14:32:19.7322615Z ... 3 more > 2020-11-04T14:32:19.7323125Z Caused by: java.lang.IllegalStateException: > Reader is already released. > 2020-11-04T14:32:19.7323760Z at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) > 2020-11-04T14:32:19.7324559Z at > org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.getNextBuffer(SortMergeSubpartitionReader.java:98) > 2020-11-04T14:32:19.7325691Z at > org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:170) > 2020-11-04T14:32:19.7326735Z at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:216) > 2020-11-04T14:32:19.7327496Z ... 21 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)