http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 5934914..1a08931 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -19,8 +19,10 @@ package org.apache.activemq.store.kahadb.scheduler; import java.io.DataInput; import java.io.DataOutput; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -28,363 +30,917 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import java.util.TreeSet; +import java.util.UUID; -import org.apache.activemq.broker.LockableServiceSupport; -import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.store.SharedFileLocker; -import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; -import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.store.kahadb.AbstractKahaDBStore; +import org.apache.activemq.store.kahadb.JournalCommand; +import org.apache.activemq.store.kahadb.KahaDBMetaData; +import org.apache.activemq.store.kahadb.Visitor; +import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; +import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; +import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Page; import org.apache.activemq.store.kahadb.disk.page.PageFile; import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; +import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore { - static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); - private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; - - public static final int CLOSED_STATE = 1; - public static final int OPEN_STATE = 2; - - private File directory; - PageFile pageFile; - private Journal journal; - protected AtomicLong journalSize = new AtomicLong(0); - private boolean failIfDatabaseIsLocked; - private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; - private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; - private boolean enableIndexWriteAsync = false; - MetaData metaData = new MetaData(this); - final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); - Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); - - protected class MetaData { - protected MetaData(JobSchedulerStoreImpl store) { - this.store = store; - } +public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore { - private final JobSchedulerStoreImpl store; - Page<MetaData> page; - BTreeIndex<Integer, Integer> journalRC; - BTreeIndex<String, JobSchedulerImpl> storedSchedulers; + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); - void createIndexes(Transaction tx) throws IOException { - this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId()); - this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); - } + private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this); + private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); + private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); + private File legacyStoreArchiveDirectory; + + /** + * The Scheduler Token is used to identify base revisions of the Scheduler store. A store + * based on the initial scheduler design will not have this tag in it's meta-data and will + * indicate an update is needed. Later versions of the scheduler can also change this value + * to indicate incompatible store bases which require complete meta-data and journal rewrites + * instead of simpler meta-data updates. + */ + static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409"); - void load(Transaction tx) throws IOException { - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.storedSchedulers.load(tx); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.load(tx); + /** + * The default scheduler store version. All new store instance will be given this version and + * earlier versions will be updated to this version. + */ + static final int CURRENT_VERSION = 1; + + @Override + public JobScheduler getJobScheduler(final String name) throws Exception { + this.indexLock.writeLock().lock(); + try { + JobSchedulerImpl result = this.schedulers.get(name); + if (result == null) { + final JobSchedulerImpl js = new JobSchedulerImpl(this); + js.setName(name); + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + js.createIndexes(tx); + js.load(tx); + metaData.getJobSchedulers().put(tx, name, js); + } + }); + result = js; + this.schedulers.put(name, js); + if (isStarted()) { + result.start(); + } + this.pageFile.flush(); + } + return result; + } finally { + this.indexLock.writeLock().unlock(); } + } + + @Override + public boolean removeJobScheduler(final String name) throws Exception { + boolean result = false; - void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { - for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { - Entry<String, JobSchedulerImpl> entry = i.next(); - entry.getValue().load(tx); - schedulers.put(entry.getKey(), entry.getValue()); + this.indexLock.writeLock().lock(); + try { + final JobSchedulerImpl js = this.schedulers.remove(name); + result = js != null; + if (result) { + js.stop(); + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + metaData.getJobSchedulers().remove(tx, name); + js.removeAll(tx); + } + }); } + } finally { + this.indexLock.writeLock().unlock(); } + return result; + } - public void read(DataInput is) throws IOException { - this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong()); - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + /** + * Sets the directory where the legacy scheduler store files are archived before an + * update attempt is made. Both the legacy index files and the journal files are moved + * to this folder prior to an upgrade attempt. + * + * @param directory + * The directory to move the legacy Scheduler Store files to. + */ + public void setLegacyStoreArchiveDirectory(File directory) { + this.legacyStoreArchiveDirectory = directory; + } + + /** + * Gets the directory where the legacy Scheduler Store files will be archived if the + * broker is started and an existing Job Scheduler Store from an old version is detected. + * + * @return the directory where scheduler store legacy files are archived on upgrade. + */ + public File getLegacyStoreArchiveDirectory() { + if (this.legacyStoreArchiveDirectory == null) { + this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore"); } - public void write(DataOutput os) throws IOException { - os.writeLong(this.storedSchedulers.getPageId()); - os.writeLong(this.journalRC.getPageId()); + return this.legacyStoreArchiveDirectory.getAbsoluteFile(); + } + + @Override + public void load() throws IOException { + if (opened.compareAndSet(false, true)) { + getJournal().start(); + try { + loadPageFile(); + } catch (UnknownStoreVersionException ex) { + LOG.info("Can't start until store update is performed."); + upgradeFromLegacy(); + // Restart with the updated store + getJournal().start(); + loadPageFile(); + LOG.info("Update from legacy Scheduler store completed successfully."); + } catch (Throwable t) { + LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString()); + LOG.debug("Index load failure", t); + + // try to recover index + try { + pageFile.unload(); + } catch (Exception ignore) { + } + if (isArchiveCorruptedIndex()) { + pageFile.archive(); + } else { + pageFile.delete(); + } + metaData = new JobSchedulerKahaDBMetaData(this); + pageFile = null; + loadPageFile(); + } + startCheckpoint(); + recover(); } + LOG.info("{} started.", this); } - class MetaDataMarshaller extends VariableMarshaller<MetaData> { - private final JobSchedulerStoreImpl store; + @Override + public void unload() throws IOException { + if (opened.compareAndSet(true, false)) { + for (JobSchedulerImpl js : this.schedulers.values()) { + try { + js.stop(); + } catch (Exception e) { + throw new IOException(e); + } + } + this.indexLock.writeLock().lock(); + try { + if (pageFile != null && pageFile.isLoaded()) { + metaData.setState(KahaDBMetaData.CLOSED_STATE); + + if (metaData.getPage() != null) { + pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + tx.store(metaData.getPage(), metaDataMarshaller, true); + } + }); + } + } + } finally { + this.indexLock.writeLock().unlock(); + } - MetaDataMarshaller(JobSchedulerStoreImpl store) { - this.store = store; - } + checkpointLock.writeLock().lock(); + try { + if (metaData.getPage() != null) { + checkpointUpdate(true); + } + } finally { + checkpointLock.writeLock().unlock(); + } + synchronized (checkpointThreadLock) { + if (checkpointThread != null) { + try { + checkpointThread.join(); + checkpointThread = null; + } catch (InterruptedException e) { + } + } + } - @Override - public MetaData readPayload(DataInput dataIn) throws IOException { - MetaData rc = new MetaData(this.store); - rc.read(dataIn); - return rc; + if (pageFile != null) { + pageFile.unload(); + pageFile = null; + } + if (this.journal != null) { + journal.close(); + journal = null; + } + + metaData = new JobSchedulerKahaDBMetaData(this); } + LOG.info("{} stopped.", this); + } - @Override - public void writePayload(MetaData object, DataOutput dataOut) throws IOException { - object.write(dataOut); + private void loadPageFile() throws IOException { + this.indexLock.writeLock().lock(); + try { + final PageFile pageFile = getPageFile(); + pageFile.load(); + pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page<JobSchedulerKahaDBMetaData> page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.setPage(page); + metaData.setState(KahaDBMetaData.CLOSED_STATE); + metaData.initialize(tx); + tx.store(metaData.getPage(), metaDataMarshaller, true); + } else { + Page<JobSchedulerKahaDBMetaData> page = null; + page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.setPage(page); + } + metaData.load(tx); + metaData.loadScheduler(tx, schedulers); + for (JobSchedulerImpl js : schedulers.values()) { + try { + js.start(); + } catch (Exception e) { + JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); + } + } + } + }); + + pageFile.flush(); + } finally { + this.indexLock.writeLock().unlock(); } } - class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { - @Override - public List<JobLocation> readPayload(DataInput dataIn) throws IOException { - List<JobLocation> result = new ArrayList<JobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - JobLocation jobLocation = new JobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); + private void upgradeFromLegacy() throws IOException { + + journal.close(); + journal = null; + try { + pageFile.unload(); + pageFile = null; + } catch (Exception ignore) {} + + File storeDir = getDirectory().getAbsoluteFile(); + File storeArchiveDir = getLegacyStoreArchiveDirectory(); + + LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir); + + // Move only the known store files, locks and other items left in place. + IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) { + return true; + } + return false; + } + }); + + // We reset everything to clean state, then we can read from the old + // scheduler store and replay the scheduled jobs into this one as adds. + getJournal().start(); + metaData = new JobSchedulerKahaDBMetaData(this); + pageFile = null; + loadPageFile(); + + LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory()); + replayer.load(); + replayer.startReplay(this); + + // Cleanup after replay and store what we've done. + pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + tx.store(metaData.getPage(), metaDataMarshaller, true); + } + }); + + checkpointUpdate(true); + getJournal().close(); + getPageFile().unload(); + } + + @Override + protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { + LOG.debug("Job Scheduler Store Checkpoint started."); + + // reflect last update exclusive of current checkpoint + Location lastUpdate = metaData.getLastUpdateLocation(); + metaData.setState(KahaDBMetaData.OPEN_STATE); + tx.store(metaData.getPage(), metaDataMarshaller, true); + pageFile.flush(); + + if (cleanup) { + final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); + final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); + + LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); + + if (lastUpdate != null) { + gcCandidateSet.remove(lastUpdate.getDataFileId()); + } + + this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() { + + @Override + public void visit(List<Integer> keys, List<Integer> values) { + for (Integer key : keys) { + if (gcCandidateSet.remove(key)) { + LOG.trace("Removed referenced file: {} from GC set", key); + } + } + } + + @Override + public boolean isInterestedInKeysBetween(Integer first, Integer second) { + return true; + } + }); + + LOG.trace("gc candidates after reference check: {}", gcCandidateSet); + + // If there are GC candidates then check the remove command location to see + // if any of them can go or if they must stay in order to ensure proper recover. + // + // A log containing any remove commands must be kept until all the logs with the + // add commands for all the removed jobs have been dropped. + if (!gcCandidateSet.isEmpty()) { + Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx); + List<Integer> orphans = new ArrayList<Integer>(); + while (removals.hasNext()) { + boolean orphanedRemve = true; + Entry<Integer, List<Integer>> entry = removals.next(); + + // If this log is not a GC candidate then there's no need to do a check to rule it out + if (gcCandidateSet.contains(entry.getKey())) { + for (Integer addLocation : entry.getValue()) { + if (completeFileSet.contains(addLocation)) { + orphanedRemve = false; + break; + } + } + + // If it's not orphaned than we can't remove it, otherwise we + // stop tracking it it's log will get deleted on the next check. + if (!orphanedRemve) { + LOG.trace("A remove in log {} has an add still in existance.", entry.getKey()); + gcCandidateSet.remove(entry.getKey()); + } else { + LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey()); + orphans.add(entry.getKey()); + } + } + } + + // Drop all orphaned removes from the tracker. + for (Integer orphan : orphans) { + metaData.getRemoveLocationTracker().remove(tx, orphan); + } + } + + LOG.trace("gc candidates after removals check: {}", gcCandidateSet); + if (!gcCandidateSet.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cleanup removing the data files: " + gcCandidateSet); + } + journal.removeDataFiles(gcCandidateSet); } - return result; } - @Override - public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (JobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); + LOG.debug("Job Scheduler Store Checkpoint complete."); + } + + /** + * Adds a reference for the journal log file pointed to by the given Location value. + * + * To prevent log files in the journal that still contain valid data that needs to be + * kept in order to allow for recovery the logs must have active references. Each Job + * scheduler should ensure that the logs are accurately referenced. + * + * @param tx + * The TX under which the update is to be performed. + * @param location + * The location value to update the reference count of. + * + * @throws IOException if an error occurs while updating the journal references table. + */ + protected void incrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + Integer val = metaData.getJournalRC().get(tx, logId); + int refCount = val != null ? val.intValue() + 1 : 1; + metaData.getJournalRC().put(tx, logId, refCount); + } + + /** + * Removes one reference for the Journal log file indicated in the given Location value. + * + * The references are used to track which log files cannot be GC'd. When the reference count + * on a log file reaches zero the file id is removed from the tracker and the log will be + * removed on the next check point update. + * + * @param tx + * The TX under which the update is to be performed. + * @param location + * The location value to update the reference count of. + * + * @throws IOException if an error occurs while updating the journal references table. + */ + protected void decrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + Integer refCount = metaData.getJournalRC().get(tx, logId); + if (refCount != null) { + int refCountValue = refCount; + refCountValue--; + if (refCountValue <= 0) { + metaData.getJournalRC().remove(tx, logId); + } else { + metaData.getJournalRC().put(tx, logId, refCountValue); } } } - class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { + /** + * Updates the Job removal tracking index with the location of a remove command and the + * original JobLocation entry. + * + * The JobLocation holds the locations in the logs where the add and update commands for + * a job stored. The log file containing the remove command can only be discarded after + * both the add and latest update log files have also been discarded. + * + * @param tx + * The TX under which the update is to be performed. + * @param location + * The location value to reference a remove command. + * @param removedJob + * The original JobLocation instance that holds the add and update locations + * + * @throws IOException if an error occurs while updating the remove location tracker. + */ + protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException { + int logId = location.getDataFileId(); + List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId); + if (removed == null) { + removed = new ArrayList<Integer>(); + } + removed.add(removedJob.getLocation().getDataFileId()); + this.metaData.getRemoveLocationTracker().put(tx, logId, removed); + } + + /** + * Retrieve the scheduled Job's byte blob from the journal. + * + * @param location + * The location of the KahaAddScheduledJobCommand that originated the Job. + * + * @return a ByteSequence containing the payload of the scheduled Job. + * + * @throws IOException if an error occurs while reading the payload value. + */ + protected ByteSequence getPayload(Location location) throws IOException { + KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location); + Buffer payload = job.getPayload(); + return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength()); + } + + public void readLockIndex() { + this.indexLock.readLock().lock(); + } + + public void readUnlockIndex() { + this.indexLock.readLock().unlock(); + } + + public void writeLockIndex() { + this.indexLock.writeLock().lock(); + } + + public void writeUnlockIndex() { + this.indexLock.writeLock().unlock(); + } + + @Override + public String toString() { + return "JobSchedulerStore: " + getDirectory(); + } + + @Override + protected String getPageFileName() { + return "scheduleDB"; + } + + @Override + protected File getDefaultDataDirectory() { + return new File(IOHelper.getDefaultDataDirectory(), "delayedDB"); + } + + private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> { + private final JobSchedulerStoreImpl store; - JobSchedulerMarshaller(JobSchedulerStoreImpl store) { + MetaDataMarshaller(JobSchedulerStoreImpl store) { this.store = store; } @Override - public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { - JobSchedulerImpl result = new JobSchedulerImpl(this.store); - result.read(dataIn); - return result; + public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException { + JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store); + rc.read(dataIn); + return rc; } @Override - public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { - js.write(dataOut); + public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException { + object.write(dataOut); } } - @Override - public File getDirectory() { - return directory; + /** + * Called during index recovery to rebuild the index from the last known good location. For + * entries that occur before the last known good position we just ignore then and move on. + * + * @param command + * the command read from the Journal which should be used to update the index. + * @param location + * the location in the index where the command was read. + * @param inDoubtlocation + * the location in the index known to be the last time the index was valid. + * + * @throws IOException if an error occurs while recovering the index. + */ + protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { + if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { + process(data, location); + } } + /** + * Called during recovery to allow the store to rebuild from scratch. + * + * @param data + * The command to process, which was read from the Journal. + * @param location + * The location of the command in the Journal. + * + * @throws IOException if an error occurs during command processing. + */ @Override - public void setDirectory(File directory) { - this.directory = directory; + protected void process(JournalCommand<?> data, final Location location) throws IOException { + data.visit(new Visitor() { + @Override + public void visit(final KahaAddScheduledJobCommand command) throws IOException { + final JobSchedulerImpl scheduler; + + indexLock.writeLock().lock(); + try { + try { + scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); + } catch (Exception e) { + throw new IOException(e); + } + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + scheduler.process(tx, command, location); + } + }); + + processLocation(location); + } finally { + indexLock.writeLock().unlock(); + } + } + + @Override + public void visit(final KahaRemoveScheduledJobCommand command) throws IOException { + final JobSchedulerImpl scheduler; + + indexLock.writeLock().lock(); + try { + try { + scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); + } catch (Exception e) { + throw new IOException(e); + } + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + scheduler.process(tx, command, location); + } + }); + + processLocation(location); + } finally { + indexLock.writeLock().unlock(); + } + } + + @Override + public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException { + final JobSchedulerImpl scheduler; + + indexLock.writeLock().lock(); + try { + try { + scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); + } catch (Exception e) { + throw new IOException(e); + } + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + scheduler.process(tx, command, location); + } + }); + + processLocation(location); + } finally { + indexLock.writeLock().unlock(); + } + } + + @Override + public void visit(final KahaRescheduleJobCommand command) throws IOException { + final JobSchedulerImpl scheduler; + + indexLock.writeLock().lock(); + try { + try { + scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); + } catch (Exception e) { + throw new IOException(e); + } + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + scheduler.process(tx, command, location); + } + }); + + processLocation(location); + } finally { + indexLock.writeLock().unlock(); + } + } + + @Override + public void visit(final KahaDestroySchedulerCommand command) { + try { + removeJobScheduler(command.getScheduler()); + } catch (Exception e) { + LOG.warn("Failed to remove scheduler: {}", command.getScheduler()); + } + + processLocation(location); + } + + @Override + public void visit(KahaTraceCommand command) { + processLocation(location); + } + }); } - @Override - public long size() { - if (!isStarted()) { - return 0; - } + protected void processLocation(final Location location) { + indexLock.writeLock().lock(); try { - return journalSize.get() + pageFile.getDiskSize(); - } catch (IOException e) { - throw new RuntimeException(e); + this.metaData.setLastUpdateLocation(location); + } finally { + indexLock.writeLock().unlock(); } } - @Override - public JobScheduler getJobScheduler(final String name) throws Exception { - JobSchedulerImpl result = this.schedulers.get(name); - if (result == null) { - final JobSchedulerImpl js = new JobSchedulerImpl(this); - js.setName(name); - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - js.createIndexes(tx); - js.load(tx); - metaData.storedSchedulers.put(tx, name, js); + /** + * We recover from the Journal logs as needed to restore the index. + * + * @throws IllegalStateException + * @throws IOException + */ + private void recover() throws IllegalStateException, IOException { + this.indexLock.writeLock().lock(); + try { + long start = System.currentTimeMillis(); + Location lastIndoubtPosition = getRecoveryPosition(); + Location recoveryPosition = lastIndoubtPosition; + + if (recoveryPosition != null) { + int redoCounter = 0; + LOG.info("Recovering from the journal ..."); + while (recoveryPosition != null) { + JournalCommand<?> message = load(recoveryPosition); + metaData.setLastUpdateLocation(recoveryPosition); + doRecover(message, recoveryPosition, lastIndoubtPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { + LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter); + } } - }); - result = js; - this.schedulers.put(name, js); - if (isStarted()) { - result.start(); + long end = System.currentTimeMillis(); + LOG.info("Recovery replayed {} operations from the journal in {} seconds.", + redoCounter, ((end - start) / 1000.0f)); } - this.pageFile.flush(); - } - return result; - } - @Override - synchronized public boolean removeJobScheduler(final String name) throws Exception { - boolean result = false; - final JobSchedulerImpl js = this.schedulers.remove(name); - result = js != null; - if (result) { - js.stop(); - getPageFile().tx().execute(new Transaction.Closure<IOException>() { + // We may have to undo some index updates. + pageFile.tx().execute(new Transaction.Closure<IOException>() { @Override public void execute(Transaction tx) throws IOException { - metaData.storedSchedulers.remove(tx, name); - js.destroy(tx); + recoverIndex(tx); } }); + + } finally { + this.indexLock.writeLock().unlock(); } - return result; } - @Override - protected synchronized void doStart() throws Exception { - if (this.directory == null) { - this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); + private Location getRecoveryPosition() throws IOException { + // This loads the first position and we completely rebuild the index if we + // do not override it with some known recovery start location. + Location result = null; + + if (!isForceRecoverIndex()) { + if (metaData.getLastUpdateLocation() != null) { + result = metaData.getLastUpdateLocation(); + } } - IOHelper.mkdirs(this.directory); - this.journal = new Journal(); - this.journal.setDirectory(directory); - this.journal.setMaxFileLength(getJournalMaxFileLength()); - this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); - this.journal.setSizeAccumulator(this.journalSize); - this.journal.start(); - this.pageFile = new PageFile(directory, "scheduleDB"); - this.pageFile.setWriteBatchSize(1); - this.pageFile.load(); - - this.pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - Page<MetaData> page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metaData); - metaData.page = page; - metaData.createIndexes(tx); - tx.store(metaData.page, metaDataMarshaller, true); - } else { - Page<MetaData> page = tx.load(0, metaDataMarshaller); - metaData = page.get(); - metaData.page = page; - } - metaData.load(tx); - metaData.loadScheduler(tx, schedulers); - for (JobSchedulerImpl js : schedulers.values()) { - try { - js.start(); - } catch (Exception e) { - JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); + return journal.getNextLocation(result); + } + + private void recoverIndex(Transaction tx) throws IOException { + long start = System.currentTimeMillis(); + + // It is possible index updates got applied before the journal updates.. + // in that case we need to removed references to Jobs that are not in the journal + final Location lastAppendLocation = journal.getLastAppendLocation(); + long undoCounter = 0; + + // Go through all the jobs in each scheduler and check if any are added after + // the last appended location and remove those. For now we ignore the update + // location since the scheduled job will update itself after the next fire and + // a new update will replace any existing update. + for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { + Map.Entry<String, JobSchedulerImpl> entry = i.next(); + JobSchedulerImpl scheduler = entry.getValue(); + + List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); + for (JobLocation job : jobs) { + if (job.getLocation().compareTo(lastAppendLocation) >= 0) { + if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) { + LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId()); + undoCounter++; } } } - }); - - this.pageFile.flush(); - LOG.info(this + " started"); - } + } - @Override - protected synchronized void doStop(ServiceStopper stopper) throws Exception { - for (JobSchedulerImpl js : this.schedulers.values()) { - js.stop(); + if (undoCounter > 0) { + // The rolled back operations are basically in flight journal writes. To avoid getting + // these the end user should do sync writes to the journal. + long end = System.currentTimeMillis(); + LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); + undoCounter = 0; } - if (this.pageFile != null) { - this.pageFile.unload(); + + // Now we check for missing and corrupt journal files. + + // 1. Collect the set of all referenced journal files based on the Location of the + // the scheduled jobs and the marked last update field. + HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); + for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { + Map.Entry<String, JobSchedulerImpl> entry = i.next(); + JobSchedulerImpl scheduler = entry.getValue(); + + List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); + for (JobLocation job : jobs) { + missingJournalFiles.add(job.getLocation().getDataFileId()); + if (job.getLastUpdate() != null) { + missingJournalFiles.add(job.getLastUpdate().getDataFileId()); + } + } } - if (this.journal != null) { - journal.close(); + + // 2. Remove from that set all known data file Id's in the journal and what's left + // is the missing set which will soon also contain the corrupted set. + missingJournalFiles.removeAll(journal.getFileMap().keySet()); + if (!missingJournalFiles.isEmpty()) { + LOG.info("Some journal files are missing: {}", missingJournalFiles); } - LOG.info(this + " stopped"); - } - synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - Integer val = this.metaData.journalRC.get(tx, logId); - int refCount = val != null ? val.intValue() + 1 : 1; - this.metaData.journalRC.put(tx, logId, refCount); - } + // 3. Now check all references in the journal logs for corruption and add any + // corrupt journal files to the missing set. + HashSet<Location> corruptedLocations = new HashSet<Location>(); - synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - int refCount = this.metaData.journalRC.get(tx, logId); - refCount--; - if (refCount <= 0) { - this.metaData.journalRC.remove(tx, logId); - Set<Integer> set = new HashSet<Integer>(); - set.add(logId); - this.journal.removeDataFiles(set); - } else { - this.metaData.journalRC.put(tx, logId, refCount); - } - } + if (isCheckForCorruptJournalFiles()) { + Collection<DataFile> dataFiles = journal.getFileMap().values(); + for (DataFile dataFile : dataFiles) { + int id = dataFile.getDataFileId(); + for (long offset : dataFile.getCorruptedBlocks()) { + corruptedLocations.add(new Location(id, (int) offset)); + } + } - synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - ByteSequence result = null; - result = this.journal.read(location); - return result; - } + if (!corruptedLocations.isEmpty()) { + LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size()); + } + } - synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { - return this.journal.write(payload, sync); - } + // 4. Now we either fail or we remove all references to missing or corrupt journal + // files from the various JobSchedulerImpl instances. We only remove the Job if + // the initial Add operation is missing when the ignore option is set, the updates + // could be lost but that's price you pay when ignoring the missing logs. + if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) { + if (!isIgnoreMissingJournalfiles()) { + throw new IOException("Detected missing/corrupt journal files."); + } - PageFile getPageFile() { - this.pageFile.isLoaded(); - return this.pageFile; - } + // Remove all Jobs that reference an Location that is either missing or corrupt. + undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations); - public boolean isFailIfDatabaseIsLocked() { - return failIfDatabaseIsLocked; - } + // Clean up the Journal Reference count Map. + removeJournalRCForMissingFiles(tx, missingJournalFiles); + } - public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { - this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + if (undoCounter > 0) { + long end = System.currentTimeMillis(); + LOG.info("Detected missing/corrupt journal files. Dropped {} jobs from the " + + "index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); + } } - public int getJournalMaxFileLength() { - return journalMaxFileLength; - } + private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException { + List<Integer> matches = new ArrayList<Integer>(); - public void setJournalMaxFileLength(int journalMaxFileLength) { - this.journalMaxFileLength = journalMaxFileLength; - } + Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx); + while (references.hasNext()) { + int dataFileId = references.next().getKey(); + if (missing.contains(dataFileId)) { + matches.add(dataFileId); + } + } - public int getJournalMaxWriteBatchSize() { - return journalMaxWriteBatchSize; + for (Integer match : matches) { + metaData.getJournalRC().remove(tx, match); + } } - public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { - this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; - } + private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException { + int removed = 0; - public boolean isEnableIndexWriteAsync() { - return enableIndexWriteAsync; - } + // Remove Jobs that reference missing or corrupt files. + // Remove Reference counts to missing or corrupt files. + // Remove and remove command markers to missing or corrupt files. + for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { + Map.Entry<String, JobSchedulerImpl> entry = i.next(); + JobSchedulerImpl scheduler = entry.getValue(); - public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { - this.enableIndexWriteAsync = enableIndexWriteAsync; - } + List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); + for (JobLocation job : jobs) { - @Override - public String toString() { - return "JobSchedulerStore:" + this.directory; - } + // Remove all jobs in missing log files. + if (missing.contains(job.getLocation().getDataFileId())) { + scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); + removed++; + continue; + } - @Override - public Locker createDefaultLocker() throws IOException { - SharedFileLocker locker = new SharedFileLocker(); - locker.setDirectory(this.getDirectory()); - return locker; - } + // Remove all jobs in corrupted parts of log files. + if (corrupted.contains(job.getLocation())) { + scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); + removed++; + } + } + } - @Override - public void init() throws Exception { + return removed; } }
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java new file mode 100644 index 0000000..5146d84 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java @@ -0,0 +1,24 @@ +package org.apache.activemq.store.kahadb.scheduler; + +import java.io.IOException; + +public class UnknownStoreVersionException extends IOException { + + private static final long serialVersionUID = -8544753506151157145L; + + private final String token; + + public UnknownStoreVersionException(Throwable cause) { + super(cause); + this.token = ""; + } + + public UnknownStoreVersionException(String token) { + super("Failed to load Store, found unknown store token: " + token); + this.token = token; + } + + public String getToken() { + return this.token; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java new file mode 100644 index 0000000..2562f50 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler.legacy; + +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.util.ByteSequence; + +/** + * Legacy version Job and Job payload wrapper. Allows for easy replay of stored + * legacy jobs into a new JobSchedulerStoreImpl intsance. + */ +final class LegacyJobImpl { + + private final LegacyJobLocation jobLocation; + private final Buffer payload; + + protected LegacyJobImpl(LegacyJobLocation location, ByteSequence payload) { + this.jobLocation = location; + this.payload = new Buffer(payload.data, payload.offset, payload.length); + } + + public String getJobId() { + return this.jobLocation.getJobId(); + } + + public Buffer getPayload() { + return this.payload; + } + + public long getPeriod() { + return this.jobLocation.getPeriod(); + } + + public int getRepeat() { + return this.jobLocation.getRepeat(); + } + + public long getDelay() { + return this.jobLocation.getDelay(); + } + + public String getCronEntry() { + return this.jobLocation.getCronEntry(); + } + + public long getNextExecutionTime() { + return this.jobLocation.getNextTime(); + } + + public long getStartTime() { + return this.jobLocation.getStartTime(); + } + + @Override + public String toString() { + return this.jobLocation.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java new file mode 100644 index 0000000..8437064 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler.legacy; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; + +final class LegacyJobLocation { + + private String jobId; + private int repeat; + private long startTime; + private long delay; + private long nextTime; + private long period; + private String cronEntry; + private final Location location; + + public LegacyJobLocation(Location location) { + this.location = location; + } + + public LegacyJobLocation() { + this(new Location()); + } + + public void readExternal(DataInput in) throws IOException { + this.jobId = in.readUTF(); + this.repeat = in.readInt(); + this.startTime = in.readLong(); + this.delay = in.readLong(); + this.nextTime = in.readLong(); + this.period = in.readLong(); + this.cronEntry = in.readUTF(); + this.location.readExternal(in); + } + + public void writeExternal(DataOutput out) throws IOException { + out.writeUTF(this.jobId); + out.writeInt(this.repeat); + out.writeLong(this.startTime); + out.writeLong(this.delay); + out.writeLong(this.nextTime); + out.writeLong(this.period); + if (this.cronEntry == null) { + this.cronEntry = ""; + } + out.writeUTF(this.cronEntry); + this.location.writeExternal(out); + } + + /** + * @return the jobId + */ + public String getJobId() { + return this.jobId; + } + + /** + * @param jobId + * the jobId to set + */ + public void setJobId(String jobId) { + this.jobId = jobId; + } + + /** + * @return the repeat + */ + public int getRepeat() { + return this.repeat; + } + + /** + * @param repeat + * the repeat to set + */ + public void setRepeat(int repeat) { + this.repeat = repeat; + } + + /** + * @return the start + */ + public long getStartTime() { + return this.startTime; + } + + /** + * @param start + * the start to set + */ + public void setStartTime(long start) { + this.startTime = start; + } + + /** + * @return the nextTime + */ + public synchronized long getNextTime() { + return this.nextTime; + } + + /** + * @param nextTime + * the nextTime to set + */ + public synchronized void setNextTime(long nextTime) { + this.nextTime = nextTime; + } + + /** + * @return the period + */ + public long getPeriod() { + return this.period; + } + + /** + * @param period + * the period to set + */ + public void setPeriod(long period) { + this.period = period; + } + + /** + * @return the cronEntry + */ + public synchronized String getCronEntry() { + return this.cronEntry; + } + + /** + * @param cronEntry + * the cronEntry to set + */ + public synchronized void setCronEntry(String cronEntry) { + this.cronEntry = cronEntry; + } + + /** + * @return if this JobLocation represents a cron entry. + */ + public boolean isCron() { + return getCronEntry() != null && getCronEntry().length() > 0; + } + + /** + * @return the delay + */ + public long getDelay() { + return this.delay; + } + + /** + * @param delay + * the delay to set + */ + public void setDelay(long delay) { + this.delay = delay; + } + + /** + * @return the location + */ + public Location getLocation() { + return this.location; + } + + @Override + public String toString() { + return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + + ", delay=" + delay + ", period=" + period + + ", repeat=" + repeat + ", nextTime=" + new Date(nextTime) + "]"; + } + + static class JobLocationMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { + static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); + + @Override + public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { + List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + LegacyJobLocation jobLocation = new LegacyJobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + @Override + public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (LegacyJobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode()); + result = prime * result + (int) (delay ^ (delay >>> 32)); + result = prime * result + ((jobId == null) ? 0 : jobId.hashCode()); + result = prime * result + ((location == null) ? 0 : location.hashCode()); + result = prime * result + (int) (nextTime ^ (nextTime >>> 32)); + result = prime * result + (int) (period ^ (period >>> 32)); + result = prime * result + repeat; + result = prime * result + (int) (startTime ^ (startTime >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + LegacyJobLocation other = (LegacyJobLocation) obj; + + if (cronEntry == null) { + if (other.cronEntry != null) { + return false; + } + } else if (!cronEntry.equals(other.cronEntry)) { + return false; + } + + if (delay != other.delay) { + return false; + } + + if (jobId == null) { + if (other.jobId != null) + return false; + } else if (!jobId.equals(other.jobId)) { + return false; + } + + if (location == null) { + if (other.location != null) { + return false; + } + } else if (!location.equals(other.location)) { + return false; + } + + if (nextTime != other.nextTime) { + return false; + } + if (period != other.period) { + return false; + } + if (repeat != other.repeat) { + return false; + } + if (startTime != other.startTime) { + return false; + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java new file mode 100644 index 0000000..687ffd7 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler.legacy; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; + +/** + * Read-only view of a stored legacy JobScheduler instance. + */ +final class LegacyJobSchedulerImpl extends ServiceSupport { + + private final LegacyJobSchedulerStoreImpl store; + private String name; + private BTreeIndex<Long, List<LegacyJobLocation>> index; + + LegacyJobSchedulerImpl(LegacyJobSchedulerStoreImpl store) { + this.store = store; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + + /** + * Returns the next time that a job would be scheduled to run. + * + * @return time of next scheduled job to run. + * + * @throws IOException if an error occurs while fetching the time. + */ + public long getNextScheduleTime() throws IOException { + Map.Entry<Long, List<LegacyJobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); + return first != null ? first.getKey() : -1l; + } + + /** + * Gets the list of the next batch of scheduled jobs in the store. + * + * @return a list of the next jobs that will run. + * + * @throws IOException if an error occurs while fetching the jobs list. + */ + public List<LegacyJobImpl> getNextScheduleJobs() throws IOException { + final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); + + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Map.Entry<Long, List<LegacyJobLocation>> first = index.getFirst(store.getPageFile().tx()); + if (first != null) { + for (LegacyJobLocation jl : first.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + LegacyJobImpl job = new LegacyJobImpl(jl, bs); + result.add(job); + } + } + } + }); + return result; + } + + /** + * Gets a list of all scheduled jobs in this store. + * + * @return a list of all the currently scheduled jobs in this store. + * + * @throws IOException if an error occurs while fetching the list of jobs. + */ + public List<LegacyJobImpl> getAllJobs() throws IOException { + final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx()); + while (iter.hasNext()) { + Map.Entry<Long, List<LegacyJobLocation>> next = iter.next(); + if (next != null) { + for (LegacyJobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + LegacyJobImpl job = new LegacyJobImpl(jl, bs); + result.add(job); + } + } else { + break; + } + } + } + }); + return result; + } + + /** + * Gets a list of all scheduled jobs that exist between the given start and end time. + * + * @param start + * The start time to look for scheduled jobs. + * @param finish + * The end time to stop looking for scheduled jobs. + * + * @return a list of all scheduled jobs that would run between the given start and end time. + * + * @throws IOException if an error occurs while fetching the list of jobs. + */ + public List<LegacyJobImpl> getAllJobs(final long start, final long finish) throws IOException { + final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx(), start); + while (iter.hasNext()) { + Map.Entry<Long, List<LegacyJobLocation>> next = iter.next(); + if (next != null && next.getKey().longValue() <= finish) { + for (LegacyJobLocation jl : next.getValue()) { + ByteSequence bs = getPayload(jl.getLocation()); + LegacyJobImpl job = new LegacyJobImpl(jl, bs); + result.add(job); + } + } else { + break; + } + } + } + }); + return result; + } + + ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + return this.store.getPayload(location); + } + + @Override + public String toString() { + return "LegacyJobScheduler: " + this.name; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + void createIndexes(Transaction tx) throws IOException { + this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), tx.allocate().getPageId()); + } + + void load(Transaction tx) throws IOException { + this.index.setKeyMarshaller(LongMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + this.index.load(tx); + } + + void read(DataInput in) throws IOException { + this.name = in.readUTF(); + this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), in.readLong()); + this.index.setKeyMarshaller(LongMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + } + + public void write(DataOutput out) throws IOException { + out.writeUTF(name); + out.writeLong(this.index.getPageId()); + } + + static class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { + static ValueMarshaller INSTANCE = new ValueMarshaller(); + + @Override + public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { + List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + LegacyJobLocation jobLocation = new LegacyJobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + @Override + public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (LegacyJobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java new file mode 100644 index 0000000..acbd4e7 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler.legacy; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Page; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.LockFile; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Read-only view of a legacy JobSchedulerStore implementation. + */ +final class LegacyJobSchedulerStoreImpl extends ServiceSupport { + + static final Logger LOG = LoggerFactory.getLogger(LegacyJobSchedulerStoreImpl.class); + + private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; + + private File directory; + private PageFile pageFile; + private Journal journal; + private LockFile lockFile; + private final AtomicLong journalSize = new AtomicLong(0); + private boolean failIfDatabaseIsLocked; + private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + private boolean enableIndexWriteAsync = false; + private MetaData metaData = new MetaData(this); + private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); + private final Map<String, LegacyJobSchedulerImpl> schedulers = new HashMap<String, LegacyJobSchedulerImpl>(); + + protected class MetaData { + protected MetaData(LegacyJobSchedulerStoreImpl store) { + this.store = store; + } + + private final LegacyJobSchedulerStoreImpl store; + Page<MetaData> page; + BTreeIndex<Integer, Integer> journalRC; + BTreeIndex<String, LegacyJobSchedulerImpl> storedSchedulers; + + void createIndexes(Transaction tx) throws IOException { + this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, tx.allocate().getPageId()); + this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); + } + + void load(Transaction tx) throws IOException { + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.storedSchedulers.load(tx); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.load(tx); + } + + void loadScheduler(Transaction tx, Map<String, LegacyJobSchedulerImpl> schedulers) throws IOException { + for (Iterator<Entry<String, LegacyJobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { + Entry<String, LegacyJobSchedulerImpl> entry = i.next(); + entry.getValue().load(tx); + schedulers.put(entry.getKey(), entry.getValue()); + } + } + + public void read(DataInput is) throws IOException { + this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, is.readLong()); + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + } + + public void write(DataOutput os) throws IOException { + os.writeLong(this.storedSchedulers.getPageId()); + os.writeLong(this.journalRC.getPageId()); + } + } + + class MetaDataMarshaller extends VariableMarshaller<MetaData> { + private final LegacyJobSchedulerStoreImpl store; + + MetaDataMarshaller(LegacyJobSchedulerStoreImpl store) { + this.store = store; + } + + @Override + public MetaData readPayload(DataInput dataIn) throws IOException { + MetaData rc = new MetaData(this.store); + rc.read(dataIn); + return rc; + } + + @Override + public void writePayload(MetaData object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } + } + + class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { + @Override + public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { + List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + LegacyJobLocation jobLocation = new LegacyJobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + @Override + public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (LegacyJobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + + class JobSchedulerMarshaller extends VariableMarshaller<LegacyJobSchedulerImpl> { + private final LegacyJobSchedulerStoreImpl store; + + JobSchedulerMarshaller(LegacyJobSchedulerStoreImpl store) { + this.store = store; + } + + @Override + public LegacyJobSchedulerImpl readPayload(DataInput dataIn) throws IOException { + LegacyJobSchedulerImpl result = new LegacyJobSchedulerImpl(this.store); + result.read(dataIn); + return result; + } + + @Override + public void writePayload(LegacyJobSchedulerImpl js, DataOutput dataOut) throws IOException { + js.write(dataOut); + } + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public long size() { + if (!isStarted()) { + return 0; + } + try { + return journalSize.get() + pageFile.getDiskSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Returns the named Job Scheduler if it exists, otherwise throws an exception. + * + * @param name + * The name of the scheduler that is to be returned. + * + * @return the named scheduler if it exists. + * + * @throws Exception if the named scheduler does not exist in this store. + */ + public LegacyJobSchedulerImpl getJobScheduler(final String name) throws Exception { + LegacyJobSchedulerImpl result = this.schedulers.get(name); + if (result == null) { + throw new NoSuchElementException("No such Job Scheduler in this store: " + name); + } + return result; + } + + /** + * Returns the names of all the schedulers that exist in this scheduler store. + * + * @return a set of names of all scheduler instances in this store. + * + * @throws Exception if an error occurs while collecting the scheduler names. + */ + public Set<String> getJobSchedulerNames() throws Exception { + Set<String> names = Collections.emptySet(); + + if (!schedulers.isEmpty()) { + return this.schedulers.keySet(); + } + + return names; + } + + @Override + protected void doStart() throws Exception { + if (this.directory == null) { + this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); + } + IOHelper.mkdirs(this.directory); + lock(); + this.journal = new Journal(); + this.journal.setDirectory(directory); + this.journal.setMaxFileLength(getJournalMaxFileLength()); + this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); + this.journal.setSizeAccumulator(this.journalSize); + this.journal.start(); + this.pageFile = new PageFile(directory, "scheduleDB"); + this.pageFile.setWriteBatchSize(1); + this.pageFile.load(); + + this.pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page<MetaData> page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.page = page; + metaData.createIndexes(tx); + tx.store(metaData.page, metaDataMarshaller, true); + + } else { + Page<MetaData> page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.page = page; + } + metaData.load(tx); + metaData.loadScheduler(tx, schedulers); + for (LegacyJobSchedulerImpl js : schedulers.values()) { + try { + js.start(); + } catch (Exception e) { + LegacyJobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); + } + } + } + }); + + this.pageFile.flush(); + LOG.info(this + " started"); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + for (LegacyJobSchedulerImpl js : this.schedulers.values()) { + js.stop(); + } + if (this.pageFile != null) { + this.pageFile.unload(); + } + if (this.journal != null) { + journal.close(); + } + if (this.lockFile != null) { + this.lockFile.unlock(); + } + this.lockFile = null; + LOG.info(this + " stopped"); + } + + ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + ByteSequence result = null; + result = this.journal.read(location); + return result; + } + + Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { + return this.journal.write(payload, sync); + } + + private void lock() throws IOException { + if (lockFile == null) { + File lockFileName = new File(directory, "lock"); + lockFile = new LockFile(lockFileName, true); + if (failIfDatabaseIsLocked) { + lockFile.lock(); + } else { + while (true) { + try { + lockFile.lock(); + break; + } catch (IOException e) { + LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + + " seconds for the database to be unlocked. Reason: " + e); + try { + Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); + } catch (InterruptedException e1) { + } + } + } + } + } + } + + PageFile getPageFile() { + this.pageFile.isLoaded(); + return this.pageFile; + } + + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; + } + + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxWriteBatchSize() { + return journalMaxWriteBatchSize; + } + + public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { + this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; + } + + public boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + @Override + public String toString() { + return "LegacyJobSchedulerStore:" + this.directory; + } +}
