http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java new file mode 100644 index 0000000..e22e4df --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; + +/** + * A VariableMarshaller instance that performs the read and write of a list of + * JobLocation objects using the JobLocation's built in read and write methods. + */ +class JobLocationsMarshaller extends VariableMarshaller<List<JobLocation>> { + static JobLocationsMarshaller INSTANCE = new JobLocationsMarshaller(); + + @Override + public List<JobLocation> readPayload(DataInput dataIn) throws IOException { + List<JobLocation> result = new ArrayList<JobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + @Override + public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 455801a..bcb819c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -32,11 +32,15 @@ import org.apache.activemq.broker.scheduler.CronParser; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobListener; import org.apache.activemq.broker.scheduler.JobScheduler; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; +import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; @@ -44,12 +48,13 @@ import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { +public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); - final JobSchedulerStoreImpl store; + private final JobSchedulerStoreImpl store; private final AtomicBoolean running = new AtomicBoolean(); private String name; - BTreeIndex<Long, List<JobLocation>> index; + private BTreeIndex<Long, List<JobLocation>> index; private Thread thread; private final AtomicBoolean started = new AtomicBoolean(false); private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>(); @@ -64,233 +69,163 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler this.name = name; } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#getName() - */ @Override public String getName() { return this.name; } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener) - */ @Override public void addListener(JobListener l) { this.jobListeners.add(l); } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener) - */ @Override public void removeListener(JobListener l) { this.jobListeners.remove(l); } @Override - public synchronized void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - schedule(tx, jobId, payload, "", 0, delay, 0); - } - }); + public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { + doSchedule(jobId, payload, "", 0, delay, 0); } @Override - public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - schedule(tx, jobId, payload, cronEntry, 0, 0, 0); - } - }); + public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { + doSchedule(jobId, payload, cronEntry, 0, 0, 0); } @Override - public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, - final int repeat) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - schedule(tx, jobId, payload, cronEntry, delay, period, repeat); - } - }); + public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException { + doSchedule(jobId, payload, cronEntry, delay, period, repeat); } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#remove(long) - */ @Override - public synchronized void remove(final long time) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - remove(tx, time); - } - }); - } - - synchronized void removeFromIndex(final long time, final String jobId) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - removeFromIndex(tx, time, jobId); - } - }); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String) - */ - public synchronized void remove(final long time, final String jobId) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - remove(tx, time, jobId); - } - }); + public void remove(final long time) throws IOException { + doRemoveRange(time, time); } - synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - remove(tx, time, jobIds); - } - }); + @Override + public void remove(final String jobId) throws IOException { + doRemove(-1, jobId); } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String) - */ @Override - public synchronized void remove(final String jobId) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - remove(tx, jobId); - } - }); + public void removeAllJobs() throws IOException { + doRemoveRange(0, Long.MAX_VALUE); } @Override - public synchronized long getNextScheduleTime() throws IOException { - Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); - return first != null ? first.getKey() : -1l; + public void removeAllJobs(final long start, final long finish) throws IOException { + doRemoveRange(start, finish); } - /* - * (non-Javadoc) - * - * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs() - */ @Override - public synchronized List<Job> getNextScheduleJobs() throws IOException { - final List<Job> result = new ArrayList<Job>(); - - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx()); - if (first != null) { - for (JobLocation jl : first.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - Job job = new JobImpl(jl, bs); - result.add(job); - } - } - } - }); - return result; + public long getNextScheduleTime() throws IOException { + this.store.readLockIndex(); + try { + Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); + return first != null ? first.getKey() : -1l; + } finally { + this.store.readUnlockIndex(); + } } @Override - public synchronized List<Job> getAllJobs() throws IOException { + public List<Job> getNextScheduleJobs() throws IOException { final List<Job> result = new ArrayList<Job>(); - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); - while (iter.hasNext()) { - Map.Entry<Long, List<JobLocation>> next = iter.next(); - if (next != null) { - for (JobLocation jl : next.getValue()) { + this.store.readLockIndex(); + try { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx); + if (first != null) { + for (JobLocation jl : first.getValue()) { ByteSequence bs = getPayload(jl.getLocation()); Job job = new JobImpl(jl, bs); result.add(job); } - } else { - break; } } - } - }); + }); + } finally { + this.store.readUnlockIndex(); + } return result; } + private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { + this.store.readLockIndex(); + try { + if (!this.store.isStopped() && !this.store.isStopping()) { + Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); + return first; + } + } finally { + this.store.readUnlockIndex(); + } + return null; + } + @Override - public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException { + public List<Job> getAllJobs() throws IOException { final List<Job> result = new ArrayList<Job>(); - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start); - while (iter.hasNext()) { - Map.Entry<Long, List<JobLocation>> next = iter.next(); - if (next != null && next.getKey().longValue() <= finish) { - for (JobLocation jl : next.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - Job job = new JobImpl(jl, bs); - result.add(job); + this.store.readLockIndex(); + try { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); + while (iter.hasNext()) { + Map.Entry<Long, List<JobLocation>> next = iter.next(); + if (next != null) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); + } + } else { + break; } - } else { - break; } } - } - }); + }); + } finally { + this.store.readUnlockIndex(); + } return result; } @Override - public synchronized void removeAllJobs() throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - destroy(tx); - } - }); - } - - @Override - public synchronized void removeAllJobs(final long start, final long finish) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - destroy(tx, start, finish); - } - }); - } - - ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - return this.store.getPayload(location); + public List<Job> getAllJobs(final long start, final long finish) throws IOException { + final List<Job> result = new ArrayList<Job>(); + this.store.readLockIndex(); + try { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start); + while (iter.hasNext()) { + Map.Entry<Long, List<JobLocation>> next = iter.next(); + if (next != null && next.getKey().longValue() <= finish) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); + } + } else { + break; + } + } + } + }); + } finally { + this.store.readUnlockIndex(); + } + return result; } - void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException { + private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { long startTime = System.currentTimeMillis(); // round startTime - so we can schedule more jobs // at the same time @@ -308,38 +243,86 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler // start time not set by CRON - so it it to the current time time = startTime; } + if (delay > 0) { time += delay; } else { time += period; } - Location location = this.store.write(payload, false); - JobLocation jobLocation = new JobLocation(location); - this.store.incrementJournalCount(tx, location); - jobLocation.setJobId(jobId); - jobLocation.setStartTime(startTime); - jobLocation.setCronEntry(cronEntry); - jobLocation.setDelay(delay); - jobLocation.setPeriod(period); - jobLocation.setRepeat(repeat); - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling " + jobLocation); - } - storeJob(tx, jobLocation, time); - this.scheduleTime.newJob(); - } - - synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException { - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - storeJob(tx, jobLocation, nextExecutionTime); - } - }); + KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); + newJob.setScheduler(name); + newJob.setJobId(jobId); + newJob.setStartTime(startTime); + newJob.setCronEntry(cronEntry); + newJob.setDelay(delay); + newJob.setPeriod(period); + newJob.setRepeat(repeat); + newJob.setNextExecutionTime(time); + newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength())); + + this.store.store(newJob); + } + + private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { + KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); + update.setScheduler(name); + update.setJobId(jobId); + update.setExecutionTime(executionTime); + update.setNextExecutionTime(nextExecutionTime); + update.setRescheduledCount(rescheduledCount); + this.store.store(update); + } + + private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException { + for (JobLocation job : jobs) { + doRemove(executionTime, job.getJobId()); + } + } + + private void doRemove(long executionTime, final String jobId) throws IOException { + KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand(); + remove.setScheduler(name); + remove.setJobId(jobId); + remove.setNextExecutionTime(executionTime); + this.store.store(remove); } - void storeJob(final Transaction tx, final JobLocation jobLocation, final long nextExecutionTime) throws IOException { + private void doRemoveRange(long start, long end) throws IOException { + KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand(); + destroy.setScheduler(name); + destroy.setStartTime(start); + destroy.setEndTime(end); + this.store.store(destroy); + } + + /** + * Adds a new Scheduled job to the index. Must be called under index lock. + * + * This method must ensure that a duplicate add is not processed into the scheduler. On index + * recover some adds may be replayed and we don't allow more than one instance of a JobId to + * exist at any given scheduled time, so filter these out to ensure idempotence. + * + * @param tx + * Transaction in which the update is performed. + * @param command + * The new scheduled job command to process. + * @param location + * The location where the add command is stored in the journal. + * + * @throws IOException if an error occurs updating the index. + */ + protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException { + JobLocation jobLocation = new JobLocation(location); + jobLocation.setJobId(command.getJobId()); + jobLocation.setStartTime(command.getStartTime()); + jobLocation.setCronEntry(command.getCronEntry()); + jobLocation.setDelay(command.getDelay()); + jobLocation.setPeriod(command.getPeriod()); + jobLocation.setRepeat(command.getRepeat()); + + long nextExecutionTime = command.getNextExecutionTime(); + List<JobLocation> values = null; jobLocation.setNextTime(nextExecutionTime); if (this.index.containsKey(tx, nextExecutionTime)) { @@ -348,106 +331,239 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler if (values == null) { values = new ArrayList<JobLocation>(); } - values.add(jobLocation); - this.index.put(tx, nextExecutionTime, values); - } - void remove(Transaction tx, long time, String jobId) throws IOException { - JobLocation result = removeFromIndex(tx, time, jobId); - if (result != null) { - this.store.decrementJournalCount(tx, result.getLocation()); + // There can never be more than one instance of the same JobId scheduled at any + // given time, when it happens its probably the result of index recovery and this + // method must be idempotent so check for it first. + if (!values.contains(jobLocation)) { + values.add(jobLocation); + + // Reference the log file where the add command is stored to prevent GC. + this.store.incrementJournalCount(tx, location); + this.index.put(tx, nextExecutionTime, values); + this.scheduleTime.newJob(); + } else { + this.index.put(tx, nextExecutionTime, values); + LOG.trace("Job {} already in scheduler at this time {}", + jobLocation.getJobId(), jobLocation.getNextTime()); } } - JobLocation removeFromIndex(Transaction tx, long time, String jobId) throws IOException { + /** + * Reschedules a Job after it has be fired. + * + * For jobs that are repeating this method updates the job in the index by adding it to the + * jobs list for the new execution time. If the job is not a cron type job then this method + * will reduce the repeat counter if the job has a fixed number of repeats set. The Job will + * be removed from the jobs list it just executed on. + * + * This method must also update the value of the last update location in the JobLocation + * instance so that the checkpoint worker doesn't drop the log file in which that command lives. + * + * This method must ensure that an reschedule command that references a job that doesn't exist + * does not cause an error since it's possible that on recover the original add might be gone + * and so the job should not reappear in the scheduler. + * + * @param tx + * The TX under which the index is updated. + * @param command + * The reschedule command to process. + * @param location + * The location in the index where the reschedule command was stored. + * + * @throws IOException if an error occurs during the reschedule. + */ + protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException { JobLocation result = null; - List<JobLocation> values = this.index.remove(tx, time); - if (values != null) { - for (int i = 0; i < values.size(); i++) { - JobLocation jl = values.get(i); - if (jl.getJobId().equals(jobId)) { - values.remove(i); - if (!values.isEmpty()) { - this.index.put(tx, time, values); + final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime()); + if (current != null) { + for (int i = 0; i < current.size(); i++) { + JobLocation jl = current.get(i); + if (jl.getJobId().equals(command.getJobId())) { + current.remove(i); + if (!current.isEmpty()) { + this.index.put(tx, command.getExecutionTime(), current); } result = jl; break; } } + } else { + LOG.debug("Process reschedule command for job {} non-existent executime time {}.", + command.getJobId(), command.getExecutionTime()); } - return result; - } - private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException { - List<JobLocation> result = removeFromIndex(tx, time, jobIds); if (result != null) { - for (JobLocation jl : result) { - this.store.decrementJournalCount(tx, jl.getLocation()); + Location previousUpdate = result.getLastUpdate(); + + List<JobLocation> target = null; + result.setNextTime(command.getNextExecutionTime()); + result.setLastUpdate(location); + result.setRescheduledCount(command.getRescheduledCount()); + if (!result.isCron() && result.getRepeat() > 0) { + result.setRepeat(result.getRepeat() - 1); } + if (this.index.containsKey(tx, command.getNextExecutionTime())) { + target = this.index.remove(tx, command.getNextExecutionTime()); + } + if (target == null) { + target = new ArrayList<JobLocation>(); + } + target.add(result); + + // Track the location of the last reschedule command and release the log file + // reference for the previous one if there was one. + this.store.incrementJournalCount(tx, location); + if (previousUpdate != null) { + this.store.decrementJournalCount(tx, previousUpdate); + } + + this.index.put(tx, command.getNextExecutionTime(), target); + this.scheduleTime.newJob(); + } else { + LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.", + command.getJobId(), command.getExecutionTime()); } } - private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException { - List<JobLocation> result = null; - List<JobLocation> values = this.index.remove(tx, time); - if (values != null) { - result = new ArrayList<JobLocation>(values.size()); + /** + * Removes a scheduled job from the scheduler. + * + * The remove operation can be of two forms. The first is that there is a job Id but no set time + * (-1) in which case the jobs index is searched until the target job Id is located. The alternate + * form is that a job Id and execution time are both set in which case the given time is checked + * for a job matching that Id. In either case once an execution time is identified the job is + * removed and the index updated. + * + * This method should ensure that if the matching job is not found that no error results as it + * is possible that on a recover the initial add command could be lost so the job may not be + * rescheduled. + * + * @param tx + * The transaction under which the index is updated. + * @param command + * The remove command to process. + * @param location + * The location of the remove command in the Journal. + * + * @throws IOException if an error occurs while updating the scheduler index. + */ + void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException { - for (JobLocation job : Jobs) { - if (values.remove(job)) { - result.add(job); - } - } + // Case 1: JobId and no time value means find the job and remove it. + // Case 2: JobId and a time value means find exactly this scheduled job. - if (!values.isEmpty()) { - this.index.put(tx, time, values); + Long executionTime = command.getNextExecutionTime(); + + List<JobLocation> values = null; + + if (executionTime == -1) { + for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry<Long, List<JobLocation>> entry = i.next(); + List<JobLocation> candidates = entry.getValue(); + if (candidates != null) { + for (JobLocation jl : candidates) { + if (jl.getJobId().equals(command.getJobId())) { + LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId()); + executionTime = entry.getKey(); + values = this.index.remove(tx, executionTime); + break; + } + } + } } + } else { + values = this.index.remove(tx, executionTime); } - return result; - } - void remove(Transaction tx, long time) throws IOException { - List<JobLocation> values = this.index.remove(tx, time); + JobLocation removed = null; + + // Remove the job and update the index if there are any other jobs scheduled at this time. if (values != null) { - for (JobLocation jl : values) { - this.store.decrementJournalCount(tx, jl.getLocation()); + for (JobLocation job : values) { + if (job.getJobId().equals(command.getJobId())) { + removed = job; + values.remove(removed); + break; + } } - } - } - void remove(Transaction tx, String id) throws IOException { - for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { - Map.Entry<Long, List<JobLocation>> entry = i.next(); - List<JobLocation> values = entry.getValue(); - if (values != null) { - for (JobLocation jl : values) { - if (jl.getJobId().equals(id)) { - remove(tx, entry.getKey(), id); - return; - } - } + if (!values.isEmpty()) { + this.index.put(tx, executionTime, values); } } - } - synchronized void destroy(Transaction tx) throws IOException { - List<Long> keys = new ArrayList<Long>(); - for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { - Map.Entry<Long, List<JobLocation>> entry = i.next(); - keys.add(entry.getKey()); - } + if (removed != null) { + LOG.trace("{} removed from scheduler {}", removed, this); - for (Long l : keys) { - List<JobLocation> values = this.index.remove(tx, l); - if (values != null) { - for (JobLocation jl : values) { - this.store.decrementJournalCount(tx, jl.getLocation()); - } + // Remove the references for add and reschedule commands for this job + // so that those logs can be GC'd when free. + this.store.decrementJournalCount(tx, removed.getLocation()); + if (removed.getLastUpdate() != null) { + this.store.decrementJournalCount(tx, removed.getLastUpdate()); } + + // now that the job is removed from the index we can store the remove info and + // then dereference the log files that hold the initial add command and the most + // recent update command. + this.store.referenceRemovedLocation(tx, location, removed); } } - synchronized void destroy(Transaction tx, long start, long finish) throws IOException { + /** + * Removes all scheduled jobs within a given time range. + * + * The method can be used to clear the entire scheduler index by specifying a range that + * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by + * setting start and end time to the same value. + * + * @param tx + * The transaction under which the index is updated. + * @param command + * The remove command to process. + * @param location + * The location of the remove command in the Journal. + * + * @throws IOException if an error occurs while updating the scheduler index. + */ + protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException { + removeInRange(tx, command.getStartTime(), command.getEndTime(), location); + } + + /** + * Removes all jobs from the schedulers index. Must be called with the index locked. + * + * @param tx + * The transaction under which the index entries for this scheduler are removed. + * + * @throws IOException if an error occurs removing the jobs from the scheduler index. + */ + protected void removeAll(Transaction tx) throws IOException { + this.removeInRange(tx, 0, Long.MAX_VALUE, null); + } + + /** + * Removes all scheduled jobs within the target range. + * + * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE] + * or it can be used to remove all jobs at a given scheduled time by passing the same time value + * for both start and end. If the optional location parameter is set then this method will update + * the store's remove location tracker with the location value and the Jobs that are being removed. + * + * This method must be called with the store index locked for writes. + * + * @param tx + * The transaction under which the index is to be updated. + * @param start + * The start time for the remove operation. + * @param finish + * The end time for the remove operation. + * @param location (optional) + * The location of the remove command that triggered this remove. + * + * @throws IOException if an error occurs during the remove operation. + */ + protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException { List<Long> keys = new ArrayList<Long>(); for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) { Map.Entry<Long, List<JobLocation>> entry = i.next(); @@ -458,32 +574,97 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler } } - for (Long l : keys) { - List<JobLocation> values = this.index.remove(tx, l); - if (values != null) { - for (JobLocation jl : values) { - this.store.decrementJournalCount(tx, jl.getLocation()); + for (Long executionTime : keys) { + List<JobLocation> values = this.index.remove(tx, executionTime); + if (location != null) { + for (JobLocation job : values) { + LOG.trace("Removing {} scheduled at: {}", job, executionTime); + + // Remove the references for add and reschedule commands for this job + // so that those logs can be GC'd when free. + this.store.decrementJournalCount(tx, job.getLocation()); + if (job.getLastUpdate() != null) { + this.store.decrementJournalCount(tx, job.getLastUpdate()); + } + + // now that the job is removed from the index we can store the remove info and + // then dereference the log files that hold the initial add command and the most + // recent update command. + this.store.referenceRemovedLocation(tx, location, job); } } } } - private synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { - if (!this.store.isStopped() && !this.store.isStopping()) { - Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); - return first; + /** + * Removes a Job from the index using it's Id value and the time it is currently set to + * be executed. This method will only remove the Job if it is found at the given execution + * time. + * + * This method must be called under index lock. + * + * @param tx + * the transaction under which this method is being executed. + * @param jobId + * the target Job Id to remove. + * @param executionTime + * the scheduled time that for the Job Id that is being removed. + * + * @returns true if the Job was removed or false if not found at the given time. + * + * @throws IOException if an error occurs while removing the Job. + */ + protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException { + boolean result = false; + + List<JobLocation> jobs = this.index.remove(tx, executionTime); + Iterator<JobLocation> jobsIter = jobs.iterator(); + while (jobsIter.hasNext()) { + JobLocation job = jobsIter.next(); + if (job.getJobId().equals(jobId)) { + jobsIter.remove(); + // Remove the references for add and reschedule commands for this job + // so that those logs can be GC'd when free. + this.store.decrementJournalCount(tx, job.getLocation()); + if (job.getLastUpdate() != null) { + this.store.decrementJournalCount(tx, job.getLastUpdate()); + } + result = true; + break; + } } - return null; + + // Return the list to the index modified or unmodified. + this.index.put(tx, executionTime, jobs); + + return result; } - void fireJob(JobLocation job) throws IllegalStateException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Firing " + job); - } - ByteSequence bs = this.store.getPayload(job.getLocation()); - for (JobListener l : jobListeners) { - l.scheduledJob(job.getJobId(), bs); + /** + * Walks the Scheduled Job Tree and collects the add location and last update location + * for all scheduled jobs. + * + * This method must be called with the index locked. + * + * @param tx + * the transaction under which this operation was invoked. + * + * @return a list of all referenced Location values for this JobSchedulerImpl + * + * @throws IOException if an error occurs walking the scheduler tree. + */ + protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException { + List<JobLocation> references = new ArrayList<JobLocation>(); + + for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry<Long, List<JobLocation>> entry = i.next(); + List<JobLocation> scheduled = entry.getValue(); + for (JobLocation job : scheduled) { + references.add(job); + } } + + return references; } @Override @@ -492,14 +673,14 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler mainLoop(); } catch (Throwable e) { if (this.running.get() && isStarted()) { - LOG.error(this + " Caught exception in mainloop", e); + LOG.error("{} Caught exception in mainloop", this, e); } } finally { if (running.get()) { try { stop(); } catch (Exception e) { - LOG.error("Failed to stop " + this); + LOG.error("Failed to stop {}", this); } } } @@ -507,56 +688,55 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler @Override public String toString() { - return "JobScheduler:" + this.name; + return "JobScheduler: " + this.name; } protected void mainLoop() { while (this.running.get()) { this.scheduleTime.clearNewJob(); try { - // peek the next job long currentTime = System.currentTimeMillis(); - // Read the list of scheduled events and fire the jobs. Once done with the batch - // remove all that were fired, and reschedule as needed. + // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as + // needed before firing the job event. Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); if (first != null) { List<JobLocation> list = new ArrayList<JobLocation>(first.getValue()); - List<JobLocation> fired = new ArrayList<JobLocation>(list.size()); + List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size()); final long executionTime = first.getKey(); long nextExecutionTime = 0; if (executionTime <= currentTime) { for (final JobLocation job : list) { + + if (!running.get()) { + break; + } + int repeat = job.getRepeat(); nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); long waitTime = nextExecutionTime - currentTime; this.scheduleTime.setWaitTime(waitTime); - if (job.isCron() == false) { + if (!job.isCron()) { fireJob(job); if (repeat != 0) { - repeat--; - job.setRepeat(repeat); - // remove this job from the index so it doesn't get destroyed - removeFromIndex(executionTime, job.getJobId()); - // and re-store it - storeJob(job, nextExecutionTime); + // Reschedule for the next time, the scheduler will take care of + // updating the repeat counter on the update. + doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); } else { - fired.add(job); + toRemove.add(job); } } else { - // cron job will have a repeat time. if (repeat == 0) { - // we haven't got a separate scheduler to execute at - // this time - just a cron job - so fire it + // This is a non-repeating Cron entry so we can fire and forget it. fireJob(job); } if (nextExecutionTime > currentTime) { - // we will run again ... - // remove this job from the index - so it doesn't get destroyed - removeFromIndex(executionTime, job.getJobId()); - // and re-store it - storeJob(job, nextExecutionTime); + // Reschedule the cron job as a new event, if the cron entry signals + // a repeat then it will be stored separately and fired as a normal + // event with decrementing repeat. + doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); + if (repeat != 0) { // we have a separate schedule to run at this time // so the cron job is used to set of a separate schedule @@ -569,14 +749,14 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler this.scheduleTime.setWaitTime(waitTime); } } else { - fired.add(job); + toRemove.add(job); } } } // now remove all jobs that have not been rescheduled from this execution // time, if there are no more entries in that time it will be removed. - remove(executionTime, fired); + doRemove(executionTime, toRemove); // If there is a job that should fire before the currently set wait time // we need to reset wait time otherwise we'll miss it. @@ -588,25 +768,30 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms"); - } this.scheduleTime.setWaitTime(executionTime - currentTime); } } this.scheduleTime.pause(); } catch (Exception ioe) { - LOG.error(this.name + " Failed to schedule job", ioe); + LOG.error("{} Failed to schedule job", this.name, ioe); try { this.store.stop(); } catch (Exception e) { - LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e); + LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e); } } } } + void fireJob(JobLocation job) throws IllegalStateException, IOException { + LOG.debug("Firing: {}", job); + ByteSequence bs = this.store.getPayload(job.getLocation()); + for (JobListener l : jobListeners) { + l.scheduledJob(job.getJobId(), bs); + } + } + @Override public void startDispatching() throws Exception { if (!this.running.get()) { @@ -627,7 +812,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler Thread t = this.thread; this.thread = null; if (t != null) { - t.join(1000); + t.join(3000); } } } @@ -643,6 +828,10 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler stopDispatching(); } + private ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + return this.store.getPayload(location); + } + long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException { long result = currentTime; String cron = job.getCronEntry(); @@ -660,7 +849,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler void load(Transaction tx) throws IOException { this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); this.index.load(tx); } @@ -668,7 +857,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler this.name = in.readUTF(); this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong()); this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); } public void write(DataOutput out) throws IOException { @@ -676,30 +865,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler out.writeLong(this.index.getPageId()); } - static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { - static ValueMarshaller INSTANCE = new ValueMarshaller(); - - @Override - public List<JobLocation> readPayload(DataInput dataIn) throws IOException { - List<JobLocation> result = new ArrayList<JobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - JobLocation jobLocation = new JobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - @Override - public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (JobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } - } - static class ScheduleTime { private final int DEFAULT_WAIT = 500; private final int DEFAULT_NEW_JOB_WAIT = 100; http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java new file mode 100644 index 0000000..c92dc7b --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb.scheduler; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData; +import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; +import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The KahaDB MetaData used to house the Index data for the KahaDB implementation + * of a JobSchedulerStore. + */ +public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> { + + static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class); + + private final JobSchedulerStoreImpl store; + + private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN; + private int version = JobSchedulerStoreImpl.CURRENT_VERSION; + + private BTreeIndex<Integer, List<Integer>> removeLocationTracker; + private BTreeIndex<Integer, Integer> journalRC; + private BTreeIndex<String, JobSchedulerImpl> storedSchedulers; + + /** + * Creates a new instance of this meta data object with the assigned + * parent JobSchedulerStore instance. + * + * @param store + * the store instance that owns this meta data. + */ + public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) { + this.store = store; + } + + /** + * @return the current value of the Scheduler store identification token. + */ + public UUID getToken() { + return this.token; + } + + /** + * @return the current value of the version tag for this meta data instance. + */ + public int getVersion() { + return this.version; + } + + /** + * Gets the index that contains the location tracking information for Jobs + * that have been removed from the index but whose add operation has yet + * to be removed from the Journal. + * + * The Journal log file where a remove command is written cannot be released + * until the log file with the original add command has also been released, + * otherwise on a log replay the scheduled job could reappear in the scheduler + * since its corresponding remove might no longer be present. + * + * @return the remove command location tracker index. + */ + public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() { + return this.removeLocationTracker; + } + + /** + * Gets the index used to track the number of reference to a Journal log file. + * + * A log file in the Journal can only be considered for removal after all the + * references to it have been released. + * + * @return the journal log file reference counter index. + */ + public BTreeIndex<Integer, Integer> getJournalRC() { + return this.journalRC; + } + + /** + * Gets the index of JobScheduler instances that have been created and stored + * in the JobSchedulerStore instance. + * + * @return the index of stored JobScheduler instances. + */ + public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() { + return this.storedSchedulers; + } + + @Override + public void initialize(Transaction tx) throws IOException { + this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId()); + this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId()); + this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId()); + } + + @Override + public void load(Transaction tx) throws IOException { + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.storedSchedulers.load(tx); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.load(tx); + this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); + this.removeLocationTracker.load(tx); + } + + /** + * Loads all the stored JobScheduler instances into the provided map. + * + * @param tx + * the Transaction under which the load operation should be executed. + * @param schedulers + * a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored. + * + * @throws IOException if an error occurs while performing the load operation. + */ + public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { + for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { + Entry<String, JobSchedulerImpl> entry = i.next(); + entry.getValue().load(tx); + schedulers.put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void read(DataInput in) throws IOException { + try { + long msb = in.readLong(); + long lsb = in.readLong(); + this.token = new UUID(msb, lsb); + } catch (Exception e) { + throw new UnknownStoreVersionException(e); + } + + if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) { + throw new UnknownStoreVersionException(token.toString()); + } + this.version = in.readInt(); + if (in.readBoolean()) { + setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in)); + } else { + setLastUpdateLocation(null); + } + this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong()); + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong()); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong()); + this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); + + LOG.info("Scheduler Store version {} loaded", this.version); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(this.token.getMostSignificantBits()); + out.writeLong(this.token.getLeastSignificantBits()); + out.writeInt(this.version); + if (getLastUpdateLocation() != null) { + out.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out); + } else { + out.writeBoolean(false); + } + out.writeLong(this.storedSchedulers.getPageId()); + out.writeLong(this.journalRC.getPageId()); + out.writeLong(this.removeLocationTracker.getPageId()); + } + + private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { + private final JobSchedulerStoreImpl store; + + JobSchedulerMarshaller(JobSchedulerStoreImpl store) { + this.store = store; + } + + @Override + public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { + JobSchedulerImpl result = new JobSchedulerImpl(this.store); + result.read(dataIn); + return result; + } + + @Override + public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { + js.write(dataOut); + } + } + + private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> { + + @Override + public List<Integer> readPayload(DataInput dataIn) throws IOException { + List<Integer> result = new ArrayList<Integer>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn)); + } + return result; + } + + @Override + public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (Integer integer : value) { + IntegerMarshaller.INSTANCE.writePayload(integer, dataOut); + } + } + } +}
