pri1712 commented on code in PR #18256:
URL: https://github.com/apache/pinot/pull/18256#discussion_r3111535244


##########
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java:
##########
@@ -69,28 +79,59 @@ public synchronized void start() {
       Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
       LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
       _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());
+      boolean hasCronTasks = 
periodicTasks.stream().map(PeriodicTask::getCronExpression)
+          .anyMatch(cronExpression -> cronExpression != null && 
!cronExpression.trim().isEmpty());
+      if (hasCronTasks) {
+        try {
+          _quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
+          _quartzScheduler.start();
+        } catch (SchedulerException e) {
+          LOGGER.error("Failed to initialize Quartz scheduler. Aborting 
periodic task scheduling.", e);
+          throw new RuntimeException(e);
+        }
+      }
+
       for (PeriodicTask periodicTask : periodicTasks) {
         periodicTask.start();
+        String cronExpression = periodicTask.getCronExpression();
         String periodicTaskTaskName = periodicTask.getTaskName();
-        long intervalInSeconds = periodicTask.getIntervalInSeconds();
-        if (intervalInSeconds <= 0) {
-          LOGGER.info("Skip scheduling periodic task: {} for periodic 
execution (it can be manually triggered)",
-              periodicTaskTaskName);
-          continue;
-        }
-        _executorService.scheduleWithFixedDelay(() -> {
+        if (cronExpression != null && !cronExpression.trim().isEmpty()) {
           try {
-            LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTaskTaskName, intervalInSeconds);
-            periodicTask.run();
-          } catch (Throwable e) {
-            // catch all errors to prevent subsequent executions from being 
silently suppressed
-            // <pre>
-            // See <a 
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
-            // 
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
-            // </pre>
-            LOGGER.warn("Caught exception while running Task: {}", 
periodicTaskTaskName, e);
+            LOGGER.info("Scheduling periodic task {} with cron expression: 
{}", periodicTask, cronExpression);
+
+            JobDetail jobDetail = JobBuilder.newJob(PeriodicTaskCronJob.class)
+                .withIdentity(periodicTaskTaskName)
+                .build();
+            
jobDetail.getJobDataMap().put(PeriodicTaskCronJob.PERIODIC_TASK_KEY, 
periodicTask);
+            CronTrigger trigger = TriggerBuilder.newTrigger()
+                .withIdentity(periodicTaskTaskName + "-CronTrigger")
+                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+                .build();
+            _quartzScheduler.scheduleJob(jobDetail, trigger);
+          } catch (SchedulerException | RuntimeException e) {
+            LOGGER.error("Failed to schedule Quartz job for task: {}", 
periodicTaskTaskName, e);
+          }

Review Comment:
   ack
   fixed this edge case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to