[ https://issues.apache.org/jira/browse/FLINK-34964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zoucao updated FLINK-34964: --------------------------- Description: I have come across a problem regarding a leak in the 'ScheduledTask' while registering the processing timer. Upon further investigation, I have identified two factors that are responsible for the leak. *1. The ScheduledTask associated with the timer has not been synchronized for deletion* see `org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`, when a registered timer want be deleted, flink only removes it from the 'processingTimeTimersQueue'. However, it's possible that this timer is the earliest one that will be triggered in the future and has been scheduled as a task submitted to the ScheduledThreadPoolExecutor. When deleting a registered timer, flink should check whether this timer is the next triggered time, if true, the current 'ScheduledTask' should be canceled. *2. Re-submit a timer earlier than the System.currentTimeMillis* Considering a case, the current time-millis is 100, and there exist 100、101、102 in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At this moment, the user registers a timer-99. 99 is less than 100(the peek timer in queue), then Flink will cancel timer 100‘s task, and re-register using timer 99. However, before canceling timer-100, the thread pool has submitted it to mailbox. Then, the mail in mailbox is as follows: {code:java} -> * register timer-99 -> trigger timer-100 -> trigger timer-99 {code} - when executing 'trigger timer 100', Flink will flush records whose timer belongs to 99 and 100, then submit timer-101 to the scheduled thread pool. - when executing 'trigger timer-99', no records need to flush, then it also submits timer-101 to the scheduled thread pool, because timer-101 is the next timer needs to trigger. Obviously, Two tasks are registered to Flink's scheduled thread pool with the same timer. In our online job, the number of these leaked Scheduled Tasks could be in the thousands, see the following figure. Here an example is posted, convenient for reproducing the case-2. {code:java} @Test public void testTimerTaskLeak() { TaskMailboxImpl mailbox = new TaskMailboxImpl(); MailboxExecutor mailboxExecutor = new MailboxExecutorImpl( mailbox, 0, StreamTaskActionExecutor.IMMEDIATE); SystemProcessingTimeService processingTimeService = new SystemProcessingTimeService(ex -> handleException(ex)); ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl( processingTimeService, callback -> deferCallbackToMailbox(mailboxExecutor, callback)); TestKeyContext keyContext = new TestKeyContext(); Queue<String> mailQueue = new LinkedBlockingDeque<>(); long curr = System.currentTimeMillis(); InternalTimerServiceImpl<Integer, String> timerService = createAndStartInternalTimerService( mock(Triggerable.class), keyContext, timeService, testKeyGroupRange, createQueueFactory()); ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute( () -> { try { keyContext.setCurrentKey(1); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8"); Thread.sleep(2L); mailboxExecutor.execute( () -> { timerService.registerProcessingTimeTimer("void", curr + 1); }, "1"); mailboxExecutor.execute( () -> { Thread.sleep(3); // wait timer +1 submitted to mailbox timerService.registerProcessingTimeTimer("void", curr - 5); }, "-5"); Thread.sleep(5L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 4), "4"); } catch (InterruptedException e) { throw new RuntimeException(e); } } ); while (mailQueue.size() < 14) { if (mailbox.mailQueue().size() > 0) { String mail = mailbox.mailQueue().peek().toString(); if (mail.length() > 5) { mailQueue.add("trigger " + (Long.parseLong(mail.split("@ ")[1]) - curr)); } else { mailQueue.add("register " + mail); } } mailboxExecutor.tryYield(); } System.out.println(mailQueue); executorService.shutdownNow(); } {code} Print Result: {code:java} [register 6, register 7, register 8, register 1, register -5, trigger 1, trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, trigger 7000, trigger 8000] {code} !image-2024-03-29-16-40-11-928.png! was: I have come across a problem regarding a leak in the 'ScheduledTask' while registering the processing timer. Upon further investigation, I have identified two factors that are responsible for the leak. *1. The ScheduledTask associated with the timer has not been synchronized for deletion * see `org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`, when a registered timer want be deleted, flink only removes it from the 'processingTimeTimersQueue'. However, it's possible that this timer is the earliest one that will be triggered in the future and has been scheduled as a task submitted to the ScheduledThreadPoolExecutor. When deleting a registered timer, flink should check whether this timer is the next triggered time, if true, the current 'ScheduledTask' should be canceled. *2. Re-submit a timer earlier than the System.currentTimeMillis* Considering a case, the current time-millis is 100, and there exist 100、101、102 in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At this moment, the user registers a timer-99. 99 is less than 100(the peek timer in queue), then Flink will cancel timer 100‘s task, and re-register using timer 99. However, before canceling timer-100, the thread pool has submitted it to mailbox. Then, the mail in mailbox is as follows: {code:java} -> * register timer-99 -> trigger timer-100 -> trigger timer-99 {code} - when executing 'trigger timer 100', Flink will flush records whose timer belongs to 99 and 100, then submit timer-101 to the scheduled thread pool. - when executing 'trigger timer-99', no records need to flush, then it also submits timer-101 to the scheduled thread pool, because timer-101 is the next timer needs to trigger. Obviously, Two tasks are registered to Flink's scheduled thread pool with the same timer. In our online job, the number of these leaked Scheduled Tasks could be in the thousands, see the following figure. Here an example is posted, convenient for reproducing the case-2. {code:java} @Test public void testTimerTaskLeak() { TaskMailboxImpl mailbox = new TaskMailboxImpl(); MailboxExecutor mailboxExecutor = new MailboxExecutorImpl( mailbox, 0, StreamTaskActionExecutor.IMMEDIATE); SystemProcessingTimeService processingTimeService = new SystemProcessingTimeService(ex -> handleException(ex)); ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl( processingTimeService, callback -> deferCallbackToMailbox(mailboxExecutor, callback)); TestKeyContext keyContext = new TestKeyContext(); Queue<String> mailQueue = new LinkedBlockingDeque<>(); long curr = System.currentTimeMillis(); InternalTimerServiceImpl<Integer, String> timerService = createAndStartInternalTimerService( mock(Triggerable.class), keyContext, timeService, testKeyGroupRange, createQueueFactory()); ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute( () -> { try { keyContext.setCurrentKey(1); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8"); Thread.sleep(2L); mailboxExecutor.execute( () -> { timerService.registerProcessingTimeTimer("void", curr + 1); }, "1"); mailboxExecutor.execute( () -> { Thread.sleep(3); // wait timer +1 submitted to mailbox timerService.registerProcessingTimeTimer("void", curr - 5); }, "-5"); Thread.sleep(5L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 4), "4"); } catch (InterruptedException e) { throw new RuntimeException(e); } } ); while (mailQueue.size() < 14) { if (mailbox.mailQueue().size() > 0) { String mail = mailbox.mailQueue().peek().toString(); if (mail.length() > 5) { mailQueue.add("trigger " + (Long.parseLong(mail.split("@ ")[1]) - curr)); } else { mailQueue.add("register " + mail); } } mailboxExecutor.tryYield(); } System.out.println(mailQueue); executorService.shutdownNow(); } {code} Print Result: {code:java} [register 6, register 7, register 8, register 1, register -5, trigger 1, trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, trigger 7000, trigger 8000] {code} !image-2024-03-29-16-40-11-928.png! > ScheduledTask leak in registering the processing timer > ------------------------------------------------------ > > Key: FLINK-34964 > URL: https://issues.apache.org/jira/browse/FLINK-34964 > Project: Flink > Issue Type: Bug > Affects Versions: 1.20.0 > Reporter: zoucao > Priority: Major > Attachments: image-2024-03-29-16-40-11-928.png > > > I have come across a problem regarding a leak in the 'ScheduledTask' while > registering the processing timer. Upon further investigation, I have > identified two factors that are responsible for the leak. > *1. The ScheduledTask associated with the timer has not been synchronized for > deletion* > see > `org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`, > when a registered timer want be deleted, flink only removes it from the > 'processingTimeTimersQueue'. However, it's possible that this timer is the > earliest one that will be triggered in the future and has been scheduled as a > task submitted to the ScheduledThreadPoolExecutor. > When deleting a registered timer, flink should check whether this timer is > the next triggered time, if true, the current 'ScheduledTask' should be > canceled. > > *2. Re-submit a timer earlier than the System.currentTimeMillis* > Considering a case, the current time-millis is 100, and there exist > 100、101、102 in the processingQueue, timer-100 has been submitted to > ScheduledThreadPool. At this moment, the user registers a timer-99. 99 is > less than 100(the peek timer in queue), then Flink will cancel timer 100‘s > task, and re-register using timer 99. However, before canceling timer-100, > the thread pool has submitted it to mailbox. > Then, the mail in mailbox is as follows: > {code:java} > -> * register timer-99 > -> trigger timer-100 > -> trigger timer-99 > {code} > - when executing 'trigger timer 100', Flink will flush records whose timer > belongs to 99 and 100, then submit timer-101 to the scheduled thread pool. > - when executing 'trigger timer-99', no records need to flush, then it also > submits timer-101 to the scheduled thread pool, because timer-101 is the next > timer needs to trigger. > Obviously, Two tasks are registered to Flink's scheduled thread pool with the > same timer. > In our online job, the number of these leaked Scheduled Tasks could be in the > thousands, see the following figure. > > Here an example is posted, convenient for reproducing the case-2. > {code:java} > @Test > public void testTimerTaskLeak() { > TaskMailboxImpl mailbox = new TaskMailboxImpl(); > MailboxExecutor mailboxExecutor = > new MailboxExecutorImpl( > mailbox, 0, StreamTaskActionExecutor.IMMEDIATE); > SystemProcessingTimeService processingTimeService = > new SystemProcessingTimeService(ex -> handleException(ex)); > ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl( > processingTimeService, > callback -> deferCallbackToMailbox(mailboxExecutor, > callback)); > TestKeyContext keyContext = new TestKeyContext(); > Queue<String> mailQueue = new LinkedBlockingDeque<>(); > long curr = System.currentTimeMillis(); > InternalTimerServiceImpl<Integer, String> timerService = > createAndStartInternalTimerService( > mock(Triggerable.class), > keyContext, > timeService, > testKeyGroupRange, > createQueueFactory()); > ExecutorService executorService = Executors.newFixedThreadPool(1); > executorService.execute( > () -> { > try { > keyContext.setCurrentKey(1); > mailboxExecutor.execute( > () -> > timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6"); > Thread.sleep(2L); > mailboxExecutor.execute( > () -> > timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7"); > Thread.sleep(2L); > mailboxExecutor.execute( > () -> > timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8"); > Thread.sleep(2L); > mailboxExecutor.execute( > () -> { > > timerService.registerProcessingTimeTimer("void", curr + 1); > }, "1"); > mailboxExecutor.execute( > () -> { > Thread.sleep(3); // wait timer +1 > submitted to mailbox > > timerService.registerProcessingTimeTimer("void", curr - 5); > }, "-5"); > Thread.sleep(5L); > mailboxExecutor.execute( > () -> > timerService.registerProcessingTimeTimer("void", curr + 4), "4"); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > } > ); > while (mailQueue.size() < 14) { > if (mailbox.mailQueue().size() > 0) { > String mail = mailbox.mailQueue().peek().toString(); > if (mail.length() > 5) { > mailQueue.add("trigger " + (Long.parseLong(mail.split("@ > ")[1]) - curr)); > } else { > mailQueue.add("register " + mail); > } > } > mailboxExecutor.tryYield(); > } > System.out.println(mailQueue); > executorService.shutdownNow(); > } > {code} > Print Result: > {code:java} > [register 6, register 7, register 8, register 1, register -5, trigger 1, > trigger -5, register 4, trigger 4, trigger 6000, trigger 6000, trigger 7000, > trigger 7000, trigger 8000] > {code} > !image-2024-03-29-16-40-11-928.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)