I think you're right Till, this is the problem.
In fact, I opened a duplicating jira ticket in parallel :)
I hope we can fix it in the next version of 1.12.

Regards,
Roman


On Fri, Jan 15, 2021 at 2:09 PM Till Rohrmann <[email protected]> wrote:

> Thanks for reporting and analyzing this issue Kelly. I think you are
> indeed running into a Flink bug. I think the problem is the following: With
> Flink 1.12.0 [1] we introduced a throttling mechanism for discarding
> checkpoints. The way it is implemented is that once a checkpoint is
> discarded it can trigger another action. This is triggering another
> checkpoint in the CheckpointCoordinator. The problem is now that we don't
> properly handle the case when the CheckpointCoordinator has been stopped in
> the meantime (e.g. if the job has reached a terminal state). That's why we
> see this RejectedExecutionException which fails the job. This is definitely
> a bug and I have created this issue [2] for fixing it. I am also pulling in
> Roman who worked on this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17073
> [2] https://issues.apache.org/jira/browse/FLINK-20992
>
> Cheers,
> Till
>
> On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <[email protected]>
> wrote:
>
>> Hi folks,
>>
>>
>>
>> I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM
>> is crashing while cancelling a job. This is causing Kubernetes readiness
>> probes to fail, the JM to be restarted, and then get in a bad state while
>> it tries to recover itself using ZK + a checkpoint which no longer exists.
>>
>>
>>
>> This is the only information being logged before the process exits:
>>
>>
>>
>>
>>
>>  *method*: uncaughtException
>>    *msg*: FATAL: Thread 'cluster-io-thread-4' produced an uncaught
>> exception. Stopping the process...
>>    *pod*: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
>>    *stack*: java.util.concurrent.RejectedExecutionException: Task
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407
>> rejected from 
>> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 25977] at
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>> at
>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>> at
>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>> at
>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>>
>> https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58
>>
>>
>>
>>
>>
>> I’m not sure how to debug this further, but it seems like an internal
>> Flink bug?
>>
>>
>>
>> More info:
>>
>>
>>    - Checkpoints are stored in S3 and I’m using the S3 connector
>>    - Identical code has been running on Flink 1.11.x for months with no
>>    issues
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Kelly
>>
>

Reply via email to