[ 
https://issues.apache.org/jira/browse/FLINK-34964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zoucao updated FLINK-34964:
---------------------------
    Description: 
I have come across a problem regarding a leak in the 'ScheduledTask' while 
registering the processing timer. Upon further investigation, I have identified 
two factors that are responsible for the leak.


*1. The ScheduledTask associated with the timer has not been synchronized for 
deletion*


see 
`org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
 when a registered timer want be deleted, flink only removes it from the 
'processingTimeTimersQueue'. However, it's possible that this timer is the 
earliest one that will be triggered in the future and has been scheduled as a 
task submitted to the ScheduledThreadPoolExecutor.

When deleting a registered timer, flink should check whether this timer is the 
next triggered time, if true, the current 'ScheduledTask' should be canceled.

 

*2. Re-submit a timer earlier than the System.currentTimeMillis*

Considering a case, the current time-millis is 100, and there exist 100、101、102 
in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At 
this moment, the user registers a timer-99. 99 is less than 100(the peek timer 
in queue), then Flink will cancel timer 100‘s task, and re-register using timer 
99. However, before canceling timer-100, the thread pool has submitted it to 
mailbox.
Then, the mail in mailbox is as follows:
{code:java}
 ->  * register timer-99
 ->    trigger timer-100
->     trigger timer-99
{code}
 - when executing 'trigger timer 100', Flink will flush records whose timer 
belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
 - when executing 'trigger timer-99', no records need to flush, then it also 
submits timer-101 to the scheduled thread pool, because timer-101 is the next 
timer needs to trigger.
Obviously, Two tasks are registered to Flink's scheduled thread pool with the 
same timer.

In our online job, the number of these leaked Scheduled Tasks could be in the 
thousands, see the following figure.

 

Here an example is posted, convenient for reproducing the case-2.
{code:java}
    @Test
    public void testTimerTaskLeak() {
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        MailboxExecutor mailboxExecutor =
                new MailboxExecutorImpl(
                        mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        SystemProcessingTimeService processingTimeService =
                new SystemProcessingTimeService(ex -> handleException(ex));

        ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
                processingTimeService,
                callback -> deferCallbackToMailbox(mailboxExecutor, callback));

        TestKeyContext keyContext = new TestKeyContext();

        Queue<String> mailQueue = new LinkedBlockingDeque<>();
        long curr = System.currentTimeMillis();
        InternalTimerServiceImpl<Integer, String> timerService =
                createAndStartInternalTimerService(
                        mock(Triggerable.class),
                        keyContext,
                        timeService,
                        testKeyGroupRange,
                        createQueueFactory());

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.execute(
                () -> {
                    try {
                            keyContext.setCurrentKey(1);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");

                            Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
                        Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
                        Thread.sleep(2L);
                            mailboxExecutor.execute(
                                    () -> {
                                        
timerService.registerProcessingTimeTimer("void", curr + 1);
                                    }, "1");

                            mailboxExecutor.execute(
                                    () -> {
                                        Thread.sleep(3); // wait timer +1 
submitted to mailbox
                                        
timerService.registerProcessingTimeTimer("void", curr - 5);
                                    }, "-5");
                            Thread.sleep(5L);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 4), "4");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

        );

        while (mailQueue.size() < 14) {
            if (mailbox.mailQueue().size() > 0) {
                String mail = mailbox.mailQueue().peek().toString();
                if (mail.length() > 5) {
                    mailQueue.add("trigger " + (Long.parseLong(mail.split("@ 
")[1]) - curr));
                } else {
                    mailQueue.add("register " + mail);
                }
            }
            mailboxExecutor.tryYield();
        }

        System.out.println(mailQueue);
        executorService.shutdownNow();
    }
{code}

Print Result:

{code:java}
[register 6, register 7, register 8, register 1, register -5, trigger 1, 
trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, 
trigger 7000, trigger 8000]
{code}



!image-2024-03-29-16-40-11-928.png!

  was:
I have come across a problem regarding a leak in the 'ScheduledTask' while 
registering the processing timer. Upon further investigation, I have identified 
two factors that are responsible for the leak.


*1. The ScheduledTask associated with the timer has not been synchronized for 
deletion *


see 
`org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
 when a registered timer want be deleted, flink only removes it from the 
'processingTimeTimersQueue'. However, it's possible that this timer is the 
earliest one that will be triggered in the future and has been scheduled as a 
task submitted to the ScheduledThreadPoolExecutor.

When deleting a registered timer, flink should check whether this timer is the 
next triggered time, if true, the current 'ScheduledTask' should be canceled.

 

*2. Re-submit a timer earlier than the System.currentTimeMillis*

Considering a case, the current time-millis is 100, and there exist 100、101、102 
in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At 
this moment, the user registers a timer-99. 99 is less than 100(the peek timer 
in queue), then Flink will cancel timer 100‘s task, and re-register using timer 
99. However, before canceling timer-100, the thread pool has submitted it to 
mailbox.
Then, the mail in mailbox is as follows:
{code:java}
 ->  * register timer-99
 ->    trigger timer-100
->     trigger timer-99
{code}
 - when executing 'trigger timer 100', Flink will flush records whose timer 
belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
 - when executing 'trigger timer-99', no records need to flush, then it also 
submits timer-101 to the scheduled thread pool, because timer-101 is the next 
timer needs to trigger.
Obviously, Two tasks are registered to Flink's scheduled thread pool with the 
same timer.

In our online job, the number of these leaked Scheduled Tasks could be in the 
thousands, see the following figure.

 

Here an example is posted, convenient for reproducing the case-2.
{code:java}
    @Test
    public void testTimerTaskLeak() {
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        MailboxExecutor mailboxExecutor =
                new MailboxExecutorImpl(
                        mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        SystemProcessingTimeService processingTimeService =
                new SystemProcessingTimeService(ex -> handleException(ex));

        ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
                processingTimeService,
                callback -> deferCallbackToMailbox(mailboxExecutor, callback));

        TestKeyContext keyContext = new TestKeyContext();

        Queue<String> mailQueue = new LinkedBlockingDeque<>();
        long curr = System.currentTimeMillis();
        InternalTimerServiceImpl<Integer, String> timerService =
                createAndStartInternalTimerService(
                        mock(Triggerable.class),
                        keyContext,
                        timeService,
                        testKeyGroupRange,
                        createQueueFactory());

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.execute(
                () -> {
                    try {
                            keyContext.setCurrentKey(1);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");

                            Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
                        Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
                        Thread.sleep(2L);
                            mailboxExecutor.execute(
                                    () -> {
                                        
timerService.registerProcessingTimeTimer("void", curr + 1);
                                    }, "1");

                            mailboxExecutor.execute(
                                    () -> {
                                        Thread.sleep(3); // wait timer +1 
submitted to mailbox
                                        
timerService.registerProcessingTimeTimer("void", curr - 5);
                                    }, "-5");
                            Thread.sleep(5L);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 4), "4");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

        );

        while (mailQueue.size() < 14) {
            if (mailbox.mailQueue().size() > 0) {
                String mail = mailbox.mailQueue().peek().toString();
                if (mail.length() > 5) {
                    mailQueue.add("trigger " + (Long.parseLong(mail.split("@ 
")[1]) - curr));
                } else {
                    mailQueue.add("register " + mail);
                }
            }
            mailboxExecutor.tryYield();
        }

        System.out.println(mailQueue);
        executorService.shutdownNow();
    }
{code}

Print Result:

{code:java}
[register 6, register 7, register 8, register 1, register -5, trigger 1, 
trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, 
trigger 7000, trigger 8000]
{code}



!image-2024-03-29-16-40-11-928.png!


> ScheduledTask leak in registering the processing timer
> ------------------------------------------------------
>
>                 Key: FLINK-34964
>                 URL: https://issues.apache.org/jira/browse/FLINK-34964
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.20.0
>            Reporter: zoucao
>            Priority: Major
>         Attachments: image-2024-03-29-16-40-11-928.png
>
>
> I have come across a problem regarding a leak in the 'ScheduledTask' while 
> registering the processing timer. Upon further investigation, I have 
> identified two factors that are responsible for the leak.
> *1. The ScheduledTask associated with the timer has not been synchronized for 
> deletion*
> see 
> `org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
>  when a registered timer want be deleted, flink only removes it from the 
> 'processingTimeTimersQueue'. However, it's possible that this timer is the 
> earliest one that will be triggered in the future and has been scheduled as a 
> task submitted to the ScheduledThreadPoolExecutor.
> When deleting a registered timer, flink should check whether this timer is 
> the next triggered time, if true, the current 'ScheduledTask' should be 
> canceled.
>  
> *2. Re-submit a timer earlier than the System.currentTimeMillis*
> Considering a case, the current time-millis is 100, and there exist 
> 100、101、102 in the processingQueue, timer-100 has been submitted to 
> ScheduledThreadPool. At this moment, the user registers a timer-99. 99 is 
> less than 100(the peek timer in queue), then Flink will cancel timer 100‘s 
> task, and re-register using timer 99. However, before canceling timer-100, 
> the thread pool has submitted it to mailbox.
> Then, the mail in mailbox is as follows:
> {code:java}
>  ->  * register timer-99
>  ->    trigger timer-100
> ->     trigger timer-99
> {code}
>  - when executing 'trigger timer 100', Flink will flush records whose timer 
> belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
>  - when executing 'trigger timer-99', no records need to flush, then it also 
> submits timer-101 to the scheduled thread pool, because timer-101 is the next 
> timer needs to trigger.
> Obviously, Two tasks are registered to Flink's scheduled thread pool with the 
> same timer.
> In our online job, the number of these leaked Scheduled Tasks could be in the 
> thousands, see the following figure.
>  
> Here an example is posted, convenient for reproducing the case-2.
> {code:java}
>     @Test
>     public void testTimerTaskLeak() {
>         TaskMailboxImpl mailbox = new TaskMailboxImpl();
>         MailboxExecutor mailboxExecutor =
>                 new MailboxExecutorImpl(
>                         mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
>         SystemProcessingTimeService processingTimeService =
>                 new SystemProcessingTimeService(ex -> handleException(ex));
>         ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
>                 processingTimeService,
>                 callback -> deferCallbackToMailbox(mailboxExecutor, 
> callback));
>         TestKeyContext keyContext = new TestKeyContext();
>         Queue<String> mailQueue = new LinkedBlockingDeque<>();
>         long curr = System.currentTimeMillis();
>         InternalTimerServiceImpl<Integer, String> timerService =
>                 createAndStartInternalTimerService(
>                         mock(Triggerable.class),
>                         keyContext,
>                         timeService,
>                         testKeyGroupRange,
>                         createQueueFactory());
>         ExecutorService executorService = Executors.newFixedThreadPool(1);
>         executorService.execute(
>                 () -> {
>                     try {
>                             keyContext.setCurrentKey(1);
>                             mailboxExecutor.execute(
>                                     () -> 
> timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");
>                             Thread.sleep(2L);
>                             mailboxExecutor.execute(
>                                     () -> 
> timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
>                         Thread.sleep(2L);
>                             mailboxExecutor.execute(
>                                     () -> 
> timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
>                         Thread.sleep(2L);
>                             mailboxExecutor.execute(
>                                     () -> {
>                                         
> timerService.registerProcessingTimeTimer("void", curr + 1);
>                                     }, "1");
>                             mailboxExecutor.execute(
>                                     () -> {
>                                         Thread.sleep(3); // wait timer +1 
> submitted to mailbox
>                                         
> timerService.registerProcessingTimeTimer("void", curr - 5);
>                                     }, "-5");
>                             Thread.sleep(5L);
>                             mailboxExecutor.execute(
>                                     () -> 
> timerService.registerProcessingTimeTimer("void", curr + 4), "4");
>                     } catch (InterruptedException e) {
>                         throw new RuntimeException(e);
>                     }
>                 }
>         );
>         while (mailQueue.size() < 14) {
>             if (mailbox.mailQueue().size() > 0) {
>                 String mail = mailbox.mailQueue().peek().toString();
>                 if (mail.length() > 5) {
>                     mailQueue.add("trigger " + (Long.parseLong(mail.split("@ 
> ")[1]) - curr));
>                 } else {
>                     mailQueue.add("register " + mail);
>                 }
>             }
>             mailboxExecutor.tryYield();
>         }
>         System.out.println(mailQueue);
>         executorService.shutdownNow();
>     }
> {code}
> Print Result:
> {code:java}
> [register 6, register 7, register 8, register 1, register -5, trigger 1, 
> trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, 
> trigger 7000, trigger 8000]
> {code}
> !image-2024-03-29-16-40-11-928.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to