This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
new fc7a68a Use B+ Tree iterator instead of DFS to find scheduled jobs to
be executed
fc7a68a is described below
commit fc7a68a8bcf24492d3b2fe5426d366103b3e0e8f
Author: Lucas Tétreault <[email protected]>
AuthorDate: Wed Nov 24 01:06:47 2021 -0800
Use B+ Tree iterator instead of DFS to find scheduled jobs to be executed
(cherry picked from commit 60859b0b7fa93b001ca2b0ac2f2385386cb2f47d)
---
.../store/kahadb/scheduler/JobSchedulerImpl.java | 163 +++++++++++----------
1 file changed, 83 insertions(+), 80 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 5889c6a..3e535d5 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -155,19 +155,6 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
return result;
}
- private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws
IOException {
- this.store.readLockIndex();
- try {
- if (!this.store.isStopped() && !this.store.isStopping()) {
- Map.Entry<Long, List<JobLocation>> first =
this.index.getFirst(this.store.getPageFile().tx());
- return first;
- }
- } finally {
- this.store.readUnlockIndex();
- }
- return null;
- }
-
@Override
public List<Job> getAllJobs() throws IOException {
final List<Job> result = new ArrayList<>();
@@ -265,6 +252,12 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
this.store.store(newJob);
}
+ private void doReschedule(List<Closure> toReschedule) throws IOException {
+ for (Closure closure : toReschedule) {
+ closure.run();
+ }
+ }
+
private void doReschedule(final String jobId, long executionTime, long
nextExecutionTime, int rescheduledCount) throws IOException {
KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
update.setScheduler(name);
@@ -275,9 +268,9 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
this.store.store(update);
}
- private void doRemove(final long executionTime, final List<JobLocation>
jobs) throws IOException {
- for (JobLocation job : jobs) {
- doRemove(executionTime, job.getJobId());
+ private void doRemove(final List<Closure> toRemove) throws IOException {
+ for (Closure closure : toRemove) {
+ closure.run();
}
}
@@ -732,77 +725,83 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
// Read the list of scheduled events and fire the jobs,
reschedule repeating jobs as
// needed before firing the job event.
- Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
- if (first != null) {
- List<JobLocation> list = new ArrayList<>(first.getValue());
- List<JobLocation> toRemove = new ArrayList<>(list.size());
- final long executionTime = first.getKey();
- long nextExecutionTime = 0;
- if (executionTime <= currentTime) {
- for (final JobLocation job : list) {
-
- if (!running.get()) {
- break;
- }
+ List<Closure> toRemove = new ArrayList<>();
+ List<Closure> toReschedule = new ArrayList<>();
+ try {
+ this.store.readLockIndex();
- int repeat = job.getRepeat();
- nextExecutionTime =
calculateNextExecutionTime(job, currentTime, repeat);
- long waitTime = nextExecutionTime - currentTime;
- this.scheduleTime.setWaitTime(waitTime);
- if (!job.isCron()) {
- fireJob(job);
- if (repeat != 0) {
- // Reschedule for the next time, the
scheduler will take care of
- // updating the repeat counter on the
update.
- doReschedule(job.getJobId(),
executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
- } else {
- toRemove.add(job);
- }
- } else {
- if (repeat == 0) {
- // This is a non-repeating Cron entry so
we can fire and forget it.
- fireJob(job);
- }
+ Iterator<Map.Entry<Long, List<JobLocation>>> iterator =
this.index.iterator(this.store.getPageFile().tx());
+ while (iterator.hasNext()) {
+ Map.Entry<Long, List<JobLocation>> next =
iterator.next();
+ if (next != null) {
+ List<JobLocation> list = new
ArrayList<>(next.getValue());
+ final long executionTime = next.getKey();
+ long nextExecutionTime = 0;
+
+ if (executionTime <= currentTime) {
+ for (final JobLocation job : list) {
- if (nextExecutionTime > currentTime) {
- // Reschedule the cron job as a new event,
if the cron entry signals
- // a repeat then it will be stored
separately and fired as a normal
- // event with decrementing repeat.
- doReschedule(job.getJobId(),
executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
-
- if (repeat != 0) {
- // we have a separate schedule to run
at this time
- // so the cron job is used to set of a
separate schedule
- // hence we won't fire the original
cron job to the
- // listeners but we do need to start a
separate schedule
- String jobId =
ID_GENERATOR.generateId();
- ByteSequence payload =
getPayload(job.getLocation());
- schedule(jobId, payload, "",
job.getDelay(), job.getPeriod(), job.getRepeat());
- waitTime = job.getDelay() != 0 ?
job.getDelay() : job.getPeriod();
-
this.scheduleTime.setWaitTime(waitTime);
+ if (!running.get()) {
+ break;
}
- } else {
- toRemove.add(job);
- }
- }
- }
- // now remove all jobs that have not been rescheduled
from this execution
- // time, if there are no more entries in that time it
will be removed.
- doRemove(executionTime, toRemove);
-
- // If there is a job that should fire before the
currently set wait time
- // we need to reset wait time otherwise we'll miss it.
- Map.Entry<Long, List<JobLocation>> nextUp =
getNextToSchedule();
- if (nextUp != null) {
- final long timeUntilNextScheduled =
nextUp.getKey() - currentTime;
- if (timeUntilNextScheduled <
this.scheduleTime.getWaitTime()) {
-
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
+ int repeat = job.getRepeat();
+ nextExecutionTime =
calculateNextExecutionTime(job, currentTime, repeat);
+ long waitTime = nextExecutionTime -
currentTime;
+ this.scheduleTime.setWaitTime(waitTime);
+ if (!job.isCron()) {
+ fireJob(job);
+ if (repeat != 0) {
+ // Reschedule for the next time,
the scheduler will take care of
+ // updating the repeat counter on
the update.
+ final long finalNextExecutionTime
= nextExecutionTime;
+ toReschedule.add(() ->
doReschedule(job.getJobId(), executionTime, finalNextExecutionTime,
job.getRescheduledCount() + 1));
+ } else {
+ toRemove.add(() ->
doRemove(executionTime, job.getJobId()));
+ }
+ } else {
+ if (repeat == 0) {
+ // This is a non-repeating Cron
entry so we can fire and forget it.
+ fireJob(job);
+ }
+
+ if (nextExecutionTime > currentTime) {
+ // Reschedule the cron job as a
new event, if the cron entry signals
+ // a repeat then it will be stored
separately and fired as a normal
+ // event with decrementing repeat.
+ final long finalNextExecutionTime
= nextExecutionTime;
+ toReschedule.add(() ->
doReschedule(job.getJobId(), executionTime, finalNextExecutionTime,
job.getRescheduledCount() + 1));
+
+ if (repeat != 0) {
+ // we have a separate schedule
to run at this time
+ // so the cron job is used to
set of a separate schedule
+ // hence we won't fire the
original cron job to the
+ // listeners but we do need to
start a separate schedule
+ String jobId =
ID_GENERATOR.generateId();
+ ByteSequence payload =
getPayload(job.getLocation());
+ schedule(jobId, payload, "",
job.getDelay(), job.getPeriod(), job.getRepeat());
+ waitTime = job.getDelay() != 0
? job.getDelay() : job.getPeriod();
+
this.scheduleTime.setWaitTime(waitTime);
+ }
+ } else {
+ toRemove.add(() ->
doRemove(executionTime, job.getJobId()));
+ }
+ }
+ }
+ } else {
+ this.scheduleTime.setWaitTime(executionTime -
currentTime);
+ break;
}
}
- } else {
- this.scheduleTime.setWaitTime(executionTime -
currentTime);
}
+ } finally {
+ this.store.readUnlockIndex();
+
+ doReschedule(toReschedule);
+
+ // now remove all jobs that have not been rescheduled,
+ // if there are no more entries in that time it will be
removed.
+ doRemove(toRemove);
}
this.scheduleTime.pause();
@@ -898,6 +897,10 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
out.writeLong(this.index.getPageId());
}
+ private interface Closure {
+ void run() throws IOException;
+ }
+
static class ScheduleTime {
private final int DEFAULT_WAIT = 500;
private final int DEFAULT_NEW_JOB_WAIT = 100;