Stefan Richter created FLINK-8385: ------------------------------------- Summary: Fix exceptions in AbstractEventTimeWindowCheckpointingITCase Key: FLINK-8385 URL: https://issues.apache.org/jira/browse/FLINK-8385 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.0 Reporter: Stefan Richter Assignee: Stefan Richter Fix For: 1.5.0
When running all `AbstractEventTimeWindowCheckpointingITCase` with debug logging, I noticed two types of logged exceptions that should not happen: 1) {code} java.util.concurrent.RejectedExecutionException: Task org.apache.flink.runtime.state.SharedStateRegistry$AsyncDisposalRunnable@49258e6c rejected from java.util.concurrent.ThreadPoolExecutor@47e27938[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 24] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.flink.runtime.state.SharedStateRegistry.scheduleAsyncDelete(SharedStateRegistry.java:197) at org.apache.flink.runtime.state.SharedStateRegistry.unregisterReference(SharedStateRegistry.java:162) at org.apache.flink.runtime.state.IncrementalKeyedStateHandle.discardState(IncrementalKeyedStateHandle.java:180) at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51) at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:54) at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:187) at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108) at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51) at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:54) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:229) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnShutdown(CompletedCheckpoint.java:198) at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore$2.apply(ZooKeeperCompletedCheckpointStore.java:351) at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore$RemoveBackgroundCallback.processResult(ZooKeeperStateHandleStore.java:640) at org.apache.curator.framework.imps.Backgrounding$1$1.run(Backgrounding.java:150) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} The cause of this exception is that disposal of an incremental state handle is triggered in another async thread and the shutdown proceeds to close the I/O Executor pool. This leads to RejectedExecutionException once the async deletes are triggered inside the registry. We need to wait for all pending ZK deletes before closing the I/O Executor pool. 2) {code} java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1090) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:685) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:621) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:574) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:230) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:225) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:278) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:69) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) ... 12 more {code} This exception can be expected when the snapshot task is cancelled by closing the streams in blocking I/O. As an expected and safe exception, it should not be logged because it can easily confuse users. -- This message was sent by Atlassian JIRA (v6.4.14#64029)