[ https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski closed FLINK-31414. ---------------------------------- Resolution: Fixed merged commit cbfeef6 to master 0dfb5abf038 to release-1.17 f2f9104dc46 to release-1.16 > exceptions in the alignment timer are ignored > --------------------------------------------- > > Key: FLINK-31414 > URL: https://issues.apache.org/jira/browse/FLINK-31414 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.13.6, 1.14.6, 1.15.3, 1.16.1 > Reporter: Feifan Wang > Priority: Critical > Labels: pull-request-available > Fix For: 1.16.2, 1.17.1 > > > Alignment timer task in alternating aligned checkpoint run as a future task > in mailbox thread, causing the exceptions > ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327]) > to be ignored. These exceptions should have failed the task, but now this > will cause the same checkpoint to fire twice initInputsCheckpoints in my test. > > {code:java} > switched from RUNNING to FAILED with failure cause: > java.lang.RuntimeException: unable to send request to worker > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516) > at > org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Suppressed: java.io.IOException: java.lang.IllegalStateException: > writer not found for request start 17 > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551) > at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) > at > org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) > at > org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) > at > org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > ... 3 more > Caused by: java.lang.IllegalStateException: writer not found for > request start 17 > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75) > ... 1 more > Caused by: java.lang.IllegalStateException: not running > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244) > ... 27 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found > for request start 17] {code} > > > see : > [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50] > -- This message was sent by Atlassian Jira (v8.20.10#820010)