[ 
https://issues.apache.org/jira/browse/FLINK-19983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228333#comment-17228333
 ] 

Yingjie Cao commented on FLINK-19983:
-------------------------------------

After some investigation, I find that the state check is not true, we just need 
to remove it. The caller, including CreditBasedSequenceNumberingViewReader and 
LocalInputChannel can handle the case correctly. 
BoundedBlockingSubpartitionReader does the same thing. The PR is available for 
review now.

> ShuffleCompressionITCase.testDataCompressionForSortMergeBlockingShuffle 
> unstable
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-19983
>                 URL: https://issues.apache.org/jira/browse/FLINK-19983
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.12.0
>            Reporter: Robert Metzger
>            Assignee: Yingjie Cao
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8997&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=f508e270-48d6-5f1e-3138-42a17e0714f0
> {code}
> 2020-11-04T14:32:19.7227316Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 16.882 s <<< FAILURE! - in 
> org.apache.flink.test.runtime.ShuffleCompressionITCase
> 2020-11-04T14:32:19.7228708Z [ERROR] 
> testDataCompressionForSortMergeBlockingShuffle[useBroadcastPartitioner = 
> true](org.apache.flink.test.runtime.ShuffleCompressionITCase)  Time elapsed: 
> 5.058 s  <<< FAILURE!
> 2020-11-04T14:32:19.7230032Z java.lang.AssertionError: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2020-11-04T14:32:19.7230580Z  at 
> org.apache.flink.test.runtime.JobGraphRunningUtil.execute(JobGraphRunningUtil.java:58)
> 2020-11-04T14:32:19.7231173Z  at 
> org.apache.flink.test.runtime.ShuffleCompressionITCase.testDataCompressionForSortMergeBlockingShuffle(ShuffleCompressionITCase.java:98)
> 2020-11-04T14:32:19.7232076Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-11-04T14:32:19.7232624Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-11-04T14:32:19.7233242Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-04T14:32:19.7233741Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-04T14:32:19.7234353Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-11-04T14:32:19.7235141Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-11-04T14:32:19.7238521Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-11-04T14:32:19.7239371Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-11-04T14:32:19.7240010Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-11-04T14:32:19.7240688Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-11-04T14:32:19.7241396Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-11-04T14:32:19.7242019Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-11-04T14:32:19.7242623Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-11-04T14:32:19.7243379Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-11-04T14:32:19.7244051Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-11-04T14:32:19.7244631Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-11-04T14:32:19.7245313Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-11-04T14:32:19.7245844Z  at 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2020-11-04T14:32:19.7246341Z  at 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2020-11-04T14:32:19.7246868Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-11-04T14:32:19.7247616Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-11-04T14:32:19.7248223Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-11-04T14:32:19.7248826Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-11-04T14:32:19.7249393Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-11-04T14:32:19.7249963Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-11-04T14:32:19.7250586Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2020-11-04T14:32:19.7251277Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2020-11-04T14:32:19.7252024Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2020-11-04T14:32:19.7252839Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2020-11-04T14:32:19.7253584Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2020-11-04T14:32:19.7254365Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2020-11-04T14:32:19.7255165Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2020-11-04T14:32:19.7255806Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> 2020-11-04T14:32:19.7256609Z Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2020-11-04T14:32:19.7257950Z  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> 2020-11-04T14:32:19.7258918Z  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> 2020-11-04T14:32:19.7259799Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
> 2020-11-04T14:32:19.7260745Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
> 2020-11-04T14:32:19.7261565Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
> 2020-11-04T14:32:19.7262363Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
> 2020-11-04T14:32:19.7263192Z  at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> 2020-11-04T14:32:19.7263936Z  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
> 2020-11-04T14:32:19.7264543Z  at 
> sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> 2020-11-04T14:32:19.7265215Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-04T14:32:19.7265827Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-04T14:32:19.7266473Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
> 2020-11-04T14:32:19.7267334Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
> 2020-11-04T14:32:19.7268117Z  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 2020-11-04T14:32:19.7268898Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> 2020-11-04T14:32:19.7269511Z  at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 2020-11-04T14:32:19.7270106Z  at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 2020-11-04T14:32:19.7270708Z  at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 2020-11-04T14:32:19.7271257Z  at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 2020-11-04T14:32:19.7271812Z  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 2020-11-04T14:32:19.7272376Z  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-04T14:32:19.7273048Z  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 2020-11-04T14:32:19.7273583Z  at 
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 2020-11-04T14:32:19.7274140Z  at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 2020-11-04T14:32:19.7274679Z  at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 2020-11-04T14:32:19.7275310Z  at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 2020-11-04T14:32:19.7275792Z  at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 2020-11-04T14:32:19.7276343Z  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 2020-11-04T14:32:19.7276864Z  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 2020-11-04T14:32:19.7277531Z  at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2020-11-04T14:32:19.7278195Z  at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2020-11-04T14:32:19.7278856Z  at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2020-11-04T14:32:19.7279542Z  at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-04T14:32:19.7281428Z Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Fatal error at remote task manager 'localhost/127.0.0.1:33368'.
> 2020-11-04T14:32:19.7282544Z  at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:283)
> 2020-11-04T14:32:19.7283723Z  at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:183)
> 2020-11-04T14:32:19.7285215Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> 2020-11-04T14:32:19.7286265Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> 2020-11-04T14:32:19.7287520Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> 2020-11-04T14:32:19.7288686Z  at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:115)
> 2020-11-04T14:32:19.7289722Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> 2020-11-04T14:32:19.7290742Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> 2020-11-04T14:32:19.7291776Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> 2020-11-04T14:32:19.7292884Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> 2020-11-04T14:32:19.7293919Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> 2020-11-04T14:32:19.7295079Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> 2020-11-04T14:32:19.7296076Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> 2020-11-04T14:32:19.7297214Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
> 2020-11-04T14:32:19.7298245Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
> 2020-11-04T14:32:19.7299068Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> 2020-11-04T14:32:19.7299963Z  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> 2020-11-04T14:32:19.7300879Z  at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> 2020-11-04T14:32:19.7301516Z  at java.lang.Thread.run(Thread.java:748)
> 2020-11-04T14:32:19.7302033Z Caused by: java.io.IOException: Reader is 
> already released.
> 2020-11-04T14:32:19.7302880Z  at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:255)
> 2020-11-04T14:32:19.7303825Z  at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:108)
> 2020-11-04T14:32:19.7304871Z  at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:173)
> 2020-11-04T14:32:19.7305873Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> 2020-11-04T14:32:19.7306932Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> 2020-11-04T14:32:19.7308179Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> 2020-11-04T14:32:19.7309255Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
> 2020-11-04T14:32:19.7310249Z  at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
> 2020-11-04T14:32:19.7311499Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> 2020-11-04T14:32:19.7312573Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> 2020-11-04T14:32:19.7313711Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> 2020-11-04T14:32:19.7314967Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428)
> 2020-11-04T14:32:19.7316038Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> 2020-11-04T14:32:19.7317216Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> 2020-11-04T14:32:19.7318253Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913)
> 2020-11-04T14:32:19.7319209Z  at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:87)
> 2020-11-04T14:32:19.7320177Z  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> 2020-11-04T14:32:19.7321167Z  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> 2020-11-04T14:32:19.7322060Z  at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> 2020-11-04T14:32:19.7322615Z  ... 3 more
> 2020-11-04T14:32:19.7323125Z Caused by: java.lang.IllegalStateException: 
> Reader is already released.
> 2020-11-04T14:32:19.7323760Z  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
> 2020-11-04T14:32:19.7324559Z  at 
> org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader.getNextBuffer(SortMergeSubpartitionReader.java:98)
> 2020-11-04T14:32:19.7325691Z  at 
> org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:170)
> 2020-11-04T14:32:19.7326735Z  at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:216)
> 2020-11-04T14:32:19.7327496Z  ... 21 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to