Copilot commented on code in PR #18256:
URL: https://github.com/apache/pinot/pull/18256#discussion_r3108145826


##########
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.periodictask;
+
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PeriodicTaskCronJob implements Job {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskCronJob.class);
+  public static final String PERIODIC_TASK_KEY = "PeriodicTask";
+
+  public PeriodicTaskCronJob() {
+  }
+
+  @Override
+  public void execute(JobExecutionContext jobExecutionContext)
+      throws JobExecutionException {
+    PeriodicTask periodicTask = (PeriodicTask) jobExecutionContext
+        .getJobDetail()
+        .getJobDataMap()
+        .get(PERIODIC_TASK_KEY);
+
+    if (periodicTask != null) {
+      try {
+        periodicTask.run();
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while running Task: {}", 
periodicTask.getTaskName(), e);
+      }

Review Comment:
   Quartz can execute the same Job concurrently on multiple threads if triggers 
overlap. Even though BasePeriodicTask serializes via a lock, that still ties up 
Quartz worker threads. Consider annotating this Job with 
`@DisallowConcurrentExecution` (and optionally catching Throwable instead of 
Exception) to prevent concurrent executions from piling up.



##########
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java:
##########
@@ -69,28 +79,59 @@ public synchronized void start() {
       Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
       LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
       _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());
+      boolean hasCronTasks = 
periodicTasks.stream().map(PeriodicTask::getCronExpression)
+          .anyMatch(cronExpression -> cronExpression != null && 
!cronExpression.trim().isEmpty());
+      if (hasCronTasks) {
+        try {
+          _quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
+          _quartzScheduler.start();
+        } catch (SchedulerException e) {
+          LOGGER.error("Failed to initialize Quartz scheduler. Aborting 
periodic task scheduling.", e);
+          throw new RuntimeException(e);
+        }

Review Comment:
   If Quartz initialization fails, start() throws RuntimeException after 
creating the ScheduledExecutorService, which will (a) abort scheduling of *all* 
periodic tasks (including fixed-delay ones) and (b) potentially leak the 
already-created executor threads. Consider logging the failure and falling back 
to fixed-delay scheduling (or at least shutting down _executorService before 
rethrowing).



##########
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java:
##########
@@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) {
     // Confirm that all threads requested execution, even though only half the 
threads completed execution.
     assertEquals(attempts.get(), numThreads);
   }
+
+  @Test
+  public void testCronScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    //let the frequency be 3600 seconds (1 hour) to prove that the cron job 
triggered the task.
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });

Review Comment:
   testCronScheduling() doesn't actually prove the cron path: with 
initialDelay=0, the legacy fixed-delay scheduler would also run immediately 
once even if interval is 3600s, so this test would still pass. Consider setting 
an initialDelay that exceeds the sleep duration (or otherwise asserting that 
fixed-delay was not used) to make the test meaningful.



##########
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java:
##########
@@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) {
     // Confirm that all threads requested execution, even though only half the 
threads completed execution.
     assertEquals(attempts.get(), numThreads);
   }
+
+  @Test
+  public void testCronScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    //let the frequency be 3600 seconds (1 hour) to prove that the cron job 
triggered the task.
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    Thread.sleep(1500L);
+    taskScheduler.stop();
+
+    assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered 
by Quartz CRON scheduler");
+  }

Review Comment:
   Both tests call taskScheduler.stop() only on the happy path. If an assertion 
fails or an exception is thrown before stop(), Quartz/executor threads may leak 
and make the test suite flaky/hang. Wrap start/sleep/assert in try/finally and 
always call stop() in the finally block.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2181,6 +2181,8 @@ public static class CursorConfigs {
 
     public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
         "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String RESPONSE_STORE_CLEANER_CRON_EXPRESSION =
+        "controller.cluster.response.store.cleaner.expression";

Review Comment:
   The new cursor ResponseStoreCleaner cron config key ends with ".expression" 
while other controller cron configs use the ".cronExpression" suffix. To keep 
config naming consistent (and avoid users guessing the wrong key), consider 
renaming this to something like "...cleaner.cronExpression".
   ```suggestion
           "controller.cluster.response.store.cleaner.cronExpression";
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java:
##########
@@ -205,4 +205,47 @@ protected void runTask(Properties periodicTaskProperties) {
     // Confirm that all threads requested execution, even though only half the 
threads completed execution.
     assertEquals(attempts.get(), numThreads);
   }
+
+  @Test
+  public void testCronScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    //let the frequency be 3600 seconds (1 hour) to prove that the cron job 
triggered the task.
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("CronTask", 3600L, 0L, "0/1 * * * * ?") {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    Thread.sleep(1500L);
+    taskScheduler.stop();
+
+    assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered 
by Quartz CRON scheduler");
+  }
+
+  @Test
+  public void testLegacyFallbackScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+    //fallback to the default fixed delay method
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("LegacyFallbackTask", 1L, 0L, null) {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    Thread.sleep(1500L);
+    taskScheduler.stop();
+
+    assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered 
by legacy fixed-delay scheduler");
+  }

Review Comment:
   Same as above: ensure taskScheduler.stop() runs via try/finally so scheduler 
threads are always cleaned up even if the assertion fails.



##########
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java:
##########
@@ -69,28 +79,59 @@ public synchronized void start() {
       Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
       LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
       _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());

Review Comment:
   _executorService is sized to the total number of tasks even when some/all 
tasks are scheduled via Quartz. This can create unused threads. Consider sizing 
the thread pool based on the number of fixed-delay tasks (still keeping enough 
capacity for scheduleNow()).
   ```suggestion
         int numFixedDelayTasks = (int) 
periodicTasks.stream().map(PeriodicTask::getCronExpression)
             .filter(cronExpression -> cronExpression == null || 
cronExpression.trim().isEmpty()).count();
         // Keep at least one thread so scheduleNow() can still execute tasks 
even if all configured tasks use Quartz.
         _executorService = Executors.newScheduledThreadPool(Math.max(1, 
numFixedDelayTasks));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -974,10 +1070,18 @@ public int 
getStaleInstancesCleanupTaskFrequencyInSeconds() {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getStaleInstancesCleanupTaskCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION);
+  }
+
   public void setStaleInstanceCleanupTaskFrequencyInSeconds(String 
frequencyPeriod) {
     
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
 frequencyPeriod);
   }
 
+  public void setStaleInstancesCleanupTaskCronExpression(String 
cronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION,
 cronExpression);
+  }
+

Review Comment:
   ControllerConf adds a cronExpression config for StaleInstancesCleanupTask, 
but it appears unused: StaleInstancesCleanupTask currently extends 
BasePeriodicTask and calls the 3-arg super(...) ctor (no cronExpression), and 
there are no references to getStaleInstancesCleanupTaskCronExpression() outside 
this class. Either wire this into the task scheduling (pass cronExpression into 
BasePeriodicTask / move to ControllerPeriodicTask) or remove the config to 
avoid a non-functional setting.
   ```suggestion
     public void setStaleInstanceCleanupTaskFrequencyInSeconds(String 
frequencyPeriod) {
       
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
 frequencyPeriod);
     }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java:
##########
@@ -69,28 +79,59 @@ public synchronized void start() {
       Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
       LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
       _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());
+      boolean hasCronTasks = 
periodicTasks.stream().map(PeriodicTask::getCronExpression)
+          .anyMatch(cronExpression -> cronExpression != null && 
!cronExpression.trim().isEmpty());
+      if (hasCronTasks) {
+        try {
+          _quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
+          _quartzScheduler.start();
+        } catch (SchedulerException e) {
+          LOGGER.error("Failed to initialize Quartz scheduler. Aborting 
periodic task scheduling.", e);
+          throw new RuntimeException(e);
+        }
+      }
+
       for (PeriodicTask periodicTask : periodicTasks) {
         periodicTask.start();
+        String cronExpression = periodicTask.getCronExpression();
         String periodicTaskTaskName = periodicTask.getTaskName();
-        long intervalInSeconds = periodicTask.getIntervalInSeconds();
-        if (intervalInSeconds <= 0) {
-          LOGGER.info("Skip scheduling periodic task: {} for periodic 
execution (it can be manually triggered)",
-              periodicTaskTaskName);
-          continue;
-        }
-        _executorService.scheduleWithFixedDelay(() -> {
+        if (cronExpression != null && !cronExpression.trim().isEmpty()) {
           try {
-            LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTaskTaskName, intervalInSeconds);
-            periodicTask.run();
-          } catch (Throwable e) {
-            // catch all errors to prevent subsequent executions from being 
silently suppressed
-            // <pre>
-            // See <a 
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
-            // 
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
-            // </pre>
-            LOGGER.warn("Caught exception while running Task: {}", 
periodicTaskTaskName, e);
+            LOGGER.info("Scheduling periodic task {} with cron expression: 
{}", periodicTask, cronExpression);
+
+            JobDetail jobDetail = JobBuilder.newJob(PeriodicTaskCronJob.class)
+                .withIdentity(periodicTaskTaskName)
+                .build();
+            
jobDetail.getJobDataMap().put(PeriodicTaskCronJob.PERIODIC_TASK_KEY, 
periodicTask);
+            CronTrigger trigger = TriggerBuilder.newTrigger()
+                .withIdentity(periodicTaskTaskName + "-CronTrigger")
+                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+                .build();
+            _quartzScheduler.scheduleJob(jobDetail, trigger);
+          } catch (SchedulerException | RuntimeException e) {
+            LOGGER.error("Failed to schedule Quartz job for task: {}", 
periodicTaskTaskName, e);
+          }

Review Comment:
   If a cron expression is provided but scheduling fails (e.g., invalid cron or 
SchedulerException), the code only logs and then the task is never scheduled. 
To match the intended "cron if provided else fixed-delay" behavior, consider 
falling back to fixed-delay scheduling when Quartz scheduling fails, or 
validate the cron expression upfront and clearly disable/fallback the task.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -803,11 +883,19 @@ public int getTaskMetricsEmitterFrequencyInSeconds() {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getTaskMetricsEmitterCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION);
+  }
+
   public void setTaskMetricsEmitterFrequencyInSeconds(int 
taskMetricsEmitterFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD,
         Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s");
   }
 
+  public void setTaskMetricsEmitterCronExpression(String 
taskMetricsEmitterCronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION, 
taskMetricsEmitterCronExpression);
+  }
+

Review Comment:
   ControllerConf adds a cronExpression config for TaskMetricsEmitter, but it 
appears unused: TaskMetricsEmitter extends BasePeriodicTask and calls the 3-arg 
super(...) ctor (no cronExpression), and there are no references to 
getTaskMetricsEmitterCronExpression() outside this class. Either wire this into 
the actual scheduling or remove the config to avoid a misleading/non-functional 
setting.
   ```suggestion
     public void setTaskMetricsEmitterFrequencyInSeconds(int 
taskMetricsEmitterFrequencyInSeconds) {
       
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD,
           Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s");
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to