I think you're right Till, this is the problem. In fact, I opened a duplicating jira ticket in parallel :) I hope we can fix it in the next version of 1.12.
Regards, Roman On Fri, Jan 15, 2021 at 2:09 PM Till Rohrmann <[email protected]> wrote: > Thanks for reporting and analyzing this issue Kelly. I think you are > indeed running into a Flink bug. I think the problem is the following: With > Flink 1.12.0 [1] we introduced a throttling mechanism for discarding > checkpoints. The way it is implemented is that once a checkpoint is > discarded it can trigger another action. This is triggering another > checkpoint in the CheckpointCoordinator. The problem is now that we don't > properly handle the case when the CheckpointCoordinator has been stopped in > the meantime (e.g. if the job has reached a terminal state). That's why we > see this RejectedExecutionException which fails the job. This is definitely > a bug and I have created this issue [2] for fixing it. I am also pulling in > Roman who worked on this feature. > > [1] https://issues.apache.org/jira/browse/FLINK-17073 > [2] https://issues.apache.org/jira/browse/FLINK-20992 > > Cheers, > Till > > On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <[email protected]> > wrote: > >> Hi folks, >> >> >> >> I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM >> is crashing while cancelling a job. This is causing Kubernetes readiness >> probes to fail, the JM to be restarted, and then get in a bad state while >> it tries to recover itself using ZK + a checkpoint which no longer exists. >> >> >> >> This is the only information being logged before the process exits: >> >> >> >> >> >> *method*: uncaughtException >> *msg*: FATAL: Thread 'cluster-io-thread-4' produced an uncaught >> exception. Stopping the process... >> *pod*: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x >> *stack*: java.util.concurrent.RejectedExecutionException: Task >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 >> rejected from >> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, >> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = >> 25977] at >> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) >> at >> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) >> at >> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) >> at >> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) >> at >> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> >> >> >> https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58 >> >> >> >> >> >> I’m not sure how to debug this further, but it seems like an internal >> Flink bug? >> >> >> >> More info: >> >> >> - Checkpoints are stored in S3 and I’m using the S3 connector >> - Identical code has been running on Flink 1.11.x for months with no >> issues >> >> >> >> >> >> Thanks, >> >> Kelly >> >
