Copilot commented on code in PR #18256: URL: https://github.com/apache/pinot/pull/18256#discussion_r3108145826
########## pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java: ########## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.periodictask; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PeriodicTaskCronJob implements Job { + private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskCronJob.class); + public static final String PERIODIC_TASK_KEY = "PeriodicTask"; + + public PeriodicTaskCronJob() { + } + + @Override + public void execute(JobExecutionContext jobExecutionContext) + throws JobExecutionException { + PeriodicTask periodicTask = (PeriodicTask) jobExecutionContext + .getJobDetail() + .getJobDataMap() + .get(PERIODIC_TASK_KEY); + + if (periodicTask != null) { + try { + periodicTask.run(); + } catch (Exception e) { + LOGGER.warn("Caught exception while running Task: {}", periodicTask.getTaskName(), e); + } Review Comment: Quartz can execute the same Job concurrently on multiple threads if triggers overlap. Even though BasePeriodicTask serializes via a lock, that still ties up Quartz worker threads. Consider annotating this Job with `@DisallowConcurrentExecution` (and optionally catching Throwable instead of Exception) to prevent concurrent executions from piling up. ########## pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java: ########## @@ -69,28 +79,59 @@ public synchronized void start() { Collection<PeriodicTask> periodicTasks = _periodicTasks.values(); LOGGER.info("Starting periodic task scheduler with tasks: {}", periodicTasks); _executorService = Executors.newScheduledThreadPool(_periodicTasks.size()); + boolean hasCronTasks = periodicTasks.stream().map(PeriodicTask::getCronExpression) + .anyMatch(cronExpression -> cronExpression != null && !cronExpression.trim().isEmpty()); + if (hasCronTasks) { + try { + _quartzScheduler = StdSchedulerFactory.getDefaultScheduler(); + _quartzScheduler.start(); + } catch (SchedulerException e) { + LOGGER.error("Failed to initialize Quartz scheduler. Aborting periodic task scheduling.", e); + throw new RuntimeException(e); + } Review Comment: If Quartz initialization fails, start() throws RuntimeException after creating the ScheduledExecutorService, which will (a) abort scheduling of *all* periodic tasks (including fixed-delay ones) and (b) potentially leak the already-created executor threads. Consider logging the failure and falling back to fixed-delay scheduling (or at least shutting down _executorService before rethrowing). ########## pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java: ########## @@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) { // Confirm that all threads requested execution, even though only half the threads completed execution. assertEquals(attempts.get(), numThreads); } + + @Test + public void testCronScheduling() throws Exception { + AtomicInteger numTimesRunCalled = new AtomicInteger(); + + //let the frequency be 3600 seconds (1 hour) to prove that the cron job triggered the task. + List<PeriodicTask> periodicTasks = List.of(new BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") { + @Override + protected void runTask(Properties periodicTaskProperties) { + numTimesRunCalled.incrementAndGet(); + } + }); Review Comment: testCronScheduling() doesn't actually prove the cron path: with initialDelay=0, the legacy fixed-delay scheduler would also run immediately once even if interval is 3600s, so this test would still pass. Consider setting an initialDelay that exceeds the sleep duration (or otherwise asserting that fixed-delay was not used) to make the test meaningful. ########## pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java: ########## @@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) { // Confirm that all threads requested execution, even though only half the threads completed execution. assertEquals(attempts.get(), numThreads); } + + @Test + public void testCronScheduling() throws Exception { + AtomicInteger numTimesRunCalled = new AtomicInteger(); + + //let the frequency be 3600 seconds (1 hour) to prove that the cron job triggered the task. + List<PeriodicTask> periodicTasks = List.of(new BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") { + @Override + protected void runTask(Properties periodicTaskProperties) { + numTimesRunCalled.incrementAndGet(); + } + }); + + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.init(periodicTasks); + taskScheduler.start(); + + Thread.sleep(1500L); + taskScheduler.stop(); + + assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered by Quartz CRON scheduler"); + } Review Comment: Both tests call taskScheduler.stop() only on the happy path. If an assertion fails or an exception is thrown before stop(), Quartz/executor threads may leak and make the test suite flaky/hang. Wrap start/sleep/assert in try/finally and always call stop() in the finally block. ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -2181,6 +2181,8 @@ public static class CursorConfigs { public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD = "controller.cluster.response.store.cleaner.frequencyPeriod"; + public static final String RESPONSE_STORE_CLEANER_CRON_EXPRESSION = + "controller.cluster.response.store.cleaner.expression"; Review Comment: The new cursor ResponseStoreCleaner cron config key ends with ".expression" while other controller cron configs use the ".cronExpression" suffix. To keep config naming consistent (and avoid users guessing the wrong key), consider renaming this to something like "...cleaner.cronExpression". ```suggestion "controller.cluster.response.store.cleaner.cronExpression"; ``` ########## pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java: ########## @@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) { // Confirm that all threads requested execution, even though only half the threads completed execution. assertEquals(attempts.get(), numThreads); } + + @Test + public void testCronScheduling() throws Exception { + AtomicInteger numTimesRunCalled = new AtomicInteger(); + + //let the frequency be 3600 seconds (1 hour) to prove that the cron job triggered the task. + List<PeriodicTask> periodicTasks = List.of(new BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") { + @Override + protected void runTask(Properties periodicTaskProperties) { + numTimesRunCalled.incrementAndGet(); + } + }); + + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.init(periodicTasks); + taskScheduler.start(); + + Thread.sleep(1500L); + taskScheduler.stop(); + + assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered by Quartz CRON scheduler"); + } + + @Test + public void testLegacyFallbackScheduling() throws Exception { + AtomicInteger numTimesRunCalled = new AtomicInteger(); + //fallback to the default fixed delay method + List<PeriodicTask> periodicTasks = List.of(new BasePeriodicTask("LegacyFallbackTask", 1L, 0L, null) { + @Override + protected void runTask(Properties periodicTaskProperties) { + numTimesRunCalled.incrementAndGet(); + } + }); + + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.init(periodicTasks); + taskScheduler.start(); + + Thread.sleep(1500L); + taskScheduler.stop(); + + assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered by legacy fixed-delay scheduler"); + } Review Comment: Same as above: ensure taskScheduler.stop() runs via try/finally so scheduler threads are always cleaned up even if the assertion fails. ########## pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java: ########## @@ -69,28 +79,59 @@ public synchronized void start() { Collection<PeriodicTask> periodicTasks = _periodicTasks.values(); LOGGER.info("Starting periodic task scheduler with tasks: {}", periodicTasks); _executorService = Executors.newScheduledThreadPool(_periodicTasks.size()); Review Comment: _executorService is sized to the total number of tasks even when some/all tasks are scheduled via Quartz. This can create unused threads. Consider sizing the thread pool based on the number of fixed-delay tasks (still keeping enough capacity for scheduleNow()). ```suggestion int numFixedDelayTasks = (int) periodicTasks.stream().map(PeriodicTask::getCronExpression) .filter(cronExpression -> cronExpression == null || cronExpression.trim().isEmpty()).count(); // Keep at least one thread so scheduleNow() can still execute tasks even if all configured tasks use Quartz. _executorService = Executors.newScheduledThreadPool(Math.max(1, numFixedDelayTasks)); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java: ########## @@ -974,10 +1070,18 @@ public int getStaleInstancesCleanupTaskFrequencyInSeconds() { return (int) convertPeriodToSeconds(period); } + public String getStaleInstancesCleanupTaskCronExpression() { + return getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION); + } + public void setStaleInstanceCleanupTaskFrequencyInSeconds(String frequencyPeriod) { setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD, frequencyPeriod); } + public void setStaleInstancesCleanupTaskCronExpression(String cronExpression) { + setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION, cronExpression); + } + Review Comment: ControllerConf adds a cronExpression config for StaleInstancesCleanupTask, but it appears unused: StaleInstancesCleanupTask currently extends BasePeriodicTask and calls the 3-arg super(...) ctor (no cronExpression), and there are no references to getStaleInstancesCleanupTaskCronExpression() outside this class. Either wire this into the task scheduling (pass cronExpression into BasePeriodicTask / move to ControllerPeriodicTask) or remove the config to avoid a non-functional setting. ```suggestion public void setStaleInstanceCleanupTaskFrequencyInSeconds(String frequencyPeriod) { setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD, frequencyPeriod); } ``` ########## pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java: ########## @@ -69,28 +79,59 @@ public synchronized void start() { Collection<PeriodicTask> periodicTasks = _periodicTasks.values(); LOGGER.info("Starting periodic task scheduler with tasks: {}", periodicTasks); _executorService = Executors.newScheduledThreadPool(_periodicTasks.size()); + boolean hasCronTasks = periodicTasks.stream().map(PeriodicTask::getCronExpression) + .anyMatch(cronExpression -> cronExpression != null && !cronExpression.trim().isEmpty()); + if (hasCronTasks) { + try { + _quartzScheduler = StdSchedulerFactory.getDefaultScheduler(); + _quartzScheduler.start(); + } catch (SchedulerException e) { + LOGGER.error("Failed to initialize Quartz scheduler. Aborting periodic task scheduling.", e); + throw new RuntimeException(e); + } + } + for (PeriodicTask periodicTask : periodicTasks) { periodicTask.start(); + String cronExpression = periodicTask.getCronExpression(); String periodicTaskTaskName = periodicTask.getTaskName(); - long intervalInSeconds = periodicTask.getIntervalInSeconds(); - if (intervalInSeconds <= 0) { - LOGGER.info("Skip scheduling periodic task: {} for periodic execution (it can be manually triggered)", - periodicTaskTaskName); - continue; - } - _executorService.scheduleWithFixedDelay(() -> { + if (cronExpression != null && !cronExpression.trim().isEmpty()) { try { - LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTaskTaskName, intervalInSeconds); - periodicTask.run(); - } catch (Throwable e) { - // catch all errors to prevent subsequent executions from being silently suppressed - // <pre> - // See <a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService - // .html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a> - // </pre> - LOGGER.warn("Caught exception while running Task: {}", periodicTaskTaskName, e); + LOGGER.info("Scheduling periodic task {} with cron expression: {}", periodicTask, cronExpression); + + JobDetail jobDetail = JobBuilder.newJob(PeriodicTaskCronJob.class) + .withIdentity(periodicTaskTaskName) + .build(); + jobDetail.getJobDataMap().put(PeriodicTaskCronJob.PERIODIC_TASK_KEY, periodicTask); + CronTrigger trigger = TriggerBuilder.newTrigger() + .withIdentity(periodicTaskTaskName + "-CronTrigger") + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + _quartzScheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException | RuntimeException e) { + LOGGER.error("Failed to schedule Quartz job for task: {}", periodicTaskTaskName, e); + } Review Comment: If a cron expression is provided but scheduling fails (e.g., invalid cron or SchedulerException), the code only logs and then the task is never scheduled. To match the intended "cron if provided else fixed-delay" behavior, consider falling back to fixed-delay scheduling when Quartz scheduling fails, or validate the cron expression upfront and clearly disable/fallback the task. ########## pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java: ########## @@ -803,11 +883,19 @@ public int getTaskMetricsEmitterFrequencyInSeconds() { return (int) convertPeriodToSeconds(period); } + public String getTaskMetricsEmitterCronExpression() { + return getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION); + } + public void setTaskMetricsEmitterFrequencyInSeconds(int taskMetricsEmitterFrequencyInSeconds) { setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD, Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s"); } + public void setTaskMetricsEmitterCronExpression(String taskMetricsEmitterCronExpression) { + setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION, taskMetricsEmitterCronExpression); + } + Review Comment: ControllerConf adds a cronExpression config for TaskMetricsEmitter, but it appears unused: TaskMetricsEmitter extends BasePeriodicTask and calls the 3-arg super(...) ctor (no cronExpression), and there are no references to getTaskMetricsEmitterCronExpression() outside this class. Either wire this into the actual scheduling or remove the config to avoid a misleading/non-functional setting. ```suggestion public void setTaskMetricsEmitterFrequencyInSeconds(int taskMetricsEmitterFrequencyInSeconds) { setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD, Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s"); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
