This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
     new fc7a68a  Use B+ Tree iterator instead of DFS to find scheduled jobs to 
be executed
fc7a68a is described below

commit fc7a68a8bcf24492d3b2fe5426d366103b3e0e8f
Author: Lucas Tétreault <[email protected]>
AuthorDate: Wed Nov 24 01:06:47 2021 -0800

    Use B+ Tree iterator instead of DFS to find scheduled jobs to be executed
    
    (cherry picked from commit 60859b0b7fa93b001ca2b0ac2f2385386cb2f47d)
---
 .../store/kahadb/scheduler/JobSchedulerImpl.java   | 163 +++++++++++----------
 1 file changed, 83 insertions(+), 80 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 5889c6a..3e535d5 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -155,19 +155,6 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
         return result;
     }
 
-    private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws 
IOException {
-        this.store.readLockIndex();
-        try {
-            if (!this.store.isStopped() && !this.store.isStopping()) {
-                Map.Entry<Long, List<JobLocation>> first = 
this.index.getFirst(this.store.getPageFile().tx());
-                return first;
-            }
-        } finally {
-            this.store.readUnlockIndex();
-        }
-        return null;
-    }
-
     @Override
     public List<Job> getAllJobs() throws IOException {
         final List<Job> result = new ArrayList<>();
@@ -265,6 +252,12 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
         this.store.store(newJob);
     }
 
+    private void doReschedule(List<Closure> toReschedule) throws IOException {
+        for (Closure closure : toReschedule) {
+            closure.run();
+        }
+    }
+
     private void doReschedule(final String jobId, long executionTime, long 
nextExecutionTime, int rescheduledCount) throws IOException {
         KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
         update.setScheduler(name);
@@ -275,9 +268,9 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
         this.store.store(update);
     }
 
-    private void doRemove(final long executionTime, final List<JobLocation> 
jobs) throws IOException {
-        for (JobLocation job : jobs) {
-            doRemove(executionTime, job.getJobId());
+    private void doRemove(final List<Closure> toRemove) throws IOException {
+        for (Closure closure : toRemove) {
+            closure.run();
         }
     }
 
@@ -732,77 +725,83 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
 
                 // Read the list of scheduled events and fire the jobs, 
reschedule repeating jobs as
                 // needed before firing the job event.
-                Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
-                if (first != null) {
-                    List<JobLocation> list = new ArrayList<>(first.getValue());
-                    List<JobLocation> toRemove = new ArrayList<>(list.size());
-                    final long executionTime = first.getKey();
-                    long nextExecutionTime = 0;
-                    if (executionTime <= currentTime) {
-                        for (final JobLocation job : list) {
-
-                            if (!running.get()) {
-                                break;
-                            }
+                List<Closure> toRemove = new ArrayList<>();
+                List<Closure> toReschedule = new ArrayList<>();
+                try {
+                    this.store.readLockIndex();
 
-                            int repeat = job.getRepeat();
-                            nextExecutionTime = 
calculateNextExecutionTime(job, currentTime, repeat);
-                            long waitTime = nextExecutionTime - currentTime;
-                            this.scheduleTime.setWaitTime(waitTime);
-                            if (!job.isCron()) {
-                                fireJob(job);
-                                if (repeat != 0) {
-                                    // Reschedule for the next time, the 
scheduler will take care of
-                                    // updating the repeat counter on the 
update.
-                                    doReschedule(job.getJobId(), 
executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
-                                } else {
-                                    toRemove.add(job);
-                                }
-                            } else {
-                                if (repeat == 0) {
-                                    // This is a non-repeating Cron entry so 
we can fire and forget it.
-                                    fireJob(job);
-                                }
+                    Iterator<Map.Entry<Long, List<JobLocation>>> iterator = 
this.index.iterator(this.store.getPageFile().tx());
+                    while (iterator.hasNext()) {
+                        Map.Entry<Long, List<JobLocation>> next = 
iterator.next();
+                        if (next != null) {
+                            List<JobLocation> list = new 
ArrayList<>(next.getValue());
+                            final long executionTime = next.getKey();
+                            long nextExecutionTime = 0;
+
+                            if (executionTime <= currentTime) {
+                                for (final JobLocation job : list) {
 
-                                if (nextExecutionTime > currentTime) {
-                                    // Reschedule the cron job as a new event, 
if the cron entry signals
-                                    // a repeat then it will be stored 
separately and fired as a normal
-                                    // event with decrementing repeat.
-                                    doReschedule(job.getJobId(), 
executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
-
-                                    if (repeat != 0) {
-                                        // we have a separate schedule to run 
at this time
-                                        // so the cron job is used to set of a 
separate schedule
-                                        // hence we won't fire the original 
cron job to the
-                                        // listeners but we do need to start a 
separate schedule
-                                        String jobId = 
ID_GENERATOR.generateId();
-                                        ByteSequence payload = 
getPayload(job.getLocation());
-                                        schedule(jobId, payload, "", 
job.getDelay(), job.getPeriod(), job.getRepeat());
-                                        waitTime = job.getDelay() != 0 ? 
job.getDelay() : job.getPeriod();
-                                        
this.scheduleTime.setWaitTime(waitTime);
+                                    if (!running.get()) {
+                                        break;
                                     }
-                                } else {
-                                    toRemove.add(job);
-                                }
-                            }
-                        }
 
-                        // now remove all jobs that have not been rescheduled 
from this execution
-                        // time, if there are no more entries in that time it 
will be removed.
-                        doRemove(executionTime, toRemove);
-
-                        // If there is a job that should fire before the 
currently set wait time
-                        // we need to reset wait time otherwise we'll miss it.
-                        Map.Entry<Long, List<JobLocation>> nextUp = 
getNextToSchedule();
-                        if (nextUp != null) {
-                            final long timeUntilNextScheduled = 
nextUp.getKey() - currentTime;
-                            if (timeUntilNextScheduled < 
this.scheduleTime.getWaitTime()) {
-                                
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
+                                    int repeat = job.getRepeat();
+                                    nextExecutionTime = 
calculateNextExecutionTime(job, currentTime, repeat);
+                                    long waitTime = nextExecutionTime - 
currentTime;
+                                    this.scheduleTime.setWaitTime(waitTime);
+                                    if (!job.isCron()) {
+                                        fireJob(job);
+                                        if (repeat != 0) {
+                                            // Reschedule for the next time, 
the scheduler will take care of
+                                            // updating the repeat counter on 
the update.
+                                            final long finalNextExecutionTime 
= nextExecutionTime;
+                                            toReschedule.add(() -> 
doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, 
job.getRescheduledCount() + 1));
+                                        } else {
+                                            toRemove.add(() -> 
doRemove(executionTime, job.getJobId()));
+                                        }
+                                    } else {
+                                        if (repeat == 0) {
+                                            // This is a non-repeating Cron 
entry so we can fire and forget it.
+                                            fireJob(job);
+                                        }
+
+                                        if (nextExecutionTime > currentTime) {
+                                            // Reschedule the cron job as a 
new event, if the cron entry signals
+                                            // a repeat then it will be stored 
separately and fired as a normal
+                                            // event with decrementing repeat.
+                                            final long finalNextExecutionTime 
= nextExecutionTime;
+                                            toReschedule.add(() -> 
doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, 
job.getRescheduledCount() + 1));
+
+                                            if (repeat != 0) {
+                                                // we have a separate schedule 
to run at this time
+                                                // so the cron job is used to 
set of a separate schedule
+                                                // hence we won't fire the 
original cron job to the
+                                                // listeners but we do need to 
start a separate schedule
+                                                String jobId = 
ID_GENERATOR.generateId();
+                                                ByteSequence payload = 
getPayload(job.getLocation());
+                                                schedule(jobId, payload, "", 
job.getDelay(), job.getPeriod(), job.getRepeat());
+                                                waitTime = job.getDelay() != 0 
? job.getDelay() : job.getPeriod();
+                                                
this.scheduleTime.setWaitTime(waitTime);
+                                            }
+                                        } else {
+                                            toRemove.add(() -> 
doRemove(executionTime, job.getJobId()));
+                                        }
+                                    }
+                                }
+                            } else {
+                                this.scheduleTime.setWaitTime(executionTime - 
currentTime);
+                                break;
                             }
                         }
-                    } else {
-                        this.scheduleTime.setWaitTime(executionTime - 
currentTime);
                     }
+                } finally {
+                    this.store.readUnlockIndex();
+
+                    doReschedule(toReschedule);
+
+                    // now remove all jobs that have not been rescheduled,
+                    // if there are no more entries in that time it will be 
removed.
+                    doRemove(toRemove);
                 }
 
                 this.scheduleTime.pause();
@@ -898,6 +897,10 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
         out.writeLong(this.index.getPageId());
     }
 
+    private interface Closure {
+        void run() throws IOException;
+    }
+
     static class ScheduleTime {
         private final int DEFAULT_WAIT = 500;
         private final int DEFAULT_NEW_JOB_WAIT = 100;

Reply via email to