Repository: activemq Updated Branches: refs/heads/master 60b0c4f85 -> 7bdcca1bd
Revert "https://issues.apache.org/jira/browse/AMQ-6285" This reverts commit 60b0c4f85ada06875e09b1bc3fbefac0f9fb6156. Inadvertantly commited a bunch of changes by mistake Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db3f8b35 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db3f8b35 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db3f8b35 Branch: refs/heads/master Commit: db3f8b3554a1ecf5c9fcf917948d10e5fa8de28e Parents: 60b0c4f Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Mon May 9 19:05:43 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Mon May 9 19:05:52 2016 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 95 ++++++++++++-------- 1 file changed, 57 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/db3f8b35/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index c1af2fe..f148971 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -111,6 +111,8 @@ import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; + public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { protected BrokerService brokerService; @@ -469,12 +471,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointLock.writeLock().unlock(); } journal.close(); - synchronized(schedulerLock) { - if (scheduler != null) { - ThreadPoolUtils.shutdownGraceful(scheduler, -1); - scheduler = null; - } - } + ThreadPoolUtils.shutdownGraceful(scheduler, -1); // clear the cache and journalSize on shutdown of the store storeCache.clear(); journalSize.set(0); @@ -630,11 +627,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { long start = System.currentTimeMillis(); - Location producerAuditPosition = recoverProducerAudit(); - Location ackMessageFileLocation = recoverAckMessageFileMap(); + Location afterProducerAudit = recoverProducerAudit(); + Location afterAckMessageFile = recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); + if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { + // valid checkpoint, possible recover from afterAckMessageFile + afterProducerAudit = null; + } + Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); if (recoveryPosition != null) { @@ -716,16 +717,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return TransactionIdConversion.convertToLocal(tx); } - private Location minimum(Location producerAuditPosition, - Location lastIndoubtPosition) { + private Location minimum(Location x, + Location y) { Location min = null; - if (producerAuditPosition != null) { - min = producerAuditPosition; - if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { - min = lastIndoubtPosition; + if (x != null) { + min = x; + if (y != null) { + int compare = y.compareTo(x); + if (compare < 0) { + min = y; + } } } else { - min = lastIndoubtPosition; + min = y; } return min; } @@ -740,7 +744,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); - return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); + return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); } catch (Exception e) { LOG.warn("Cannot recover message audit", e); return journal.getNextLocation(null); @@ -758,7 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); - return journal.getNextLocation(metadata.ackMessageFileMapLocation); + return getNextInitializedLocation(metadata.ackMessageFileMapLocation); } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); return journal.getNextLocation(null); @@ -986,13 +990,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Perhaps there were no transactions... if( metadata.lastUpdate!=null) { // Start replay at the record after the last one recorded in the index file. - return journal.getNextLocation(metadata.lastUpdate); + return getNextInitializedLocation(metadata.lastUpdate); } } // This loads the first position. return journal.getNextLocation(null); } + private Location getNextInitializedLocation(Location location) throws IOException { + Location mayNotBeInitialized = journal.getNextLocation(location); + if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { + // need to init size and type to skip + return journal.getNextLocation(mayNotBeInitialized); + } else { + return mayNotBeInitialized; + } + } + protected void checkpointCleanup(final boolean cleanup) throws IOException { long start; this.indexLock.writeLock().lock(); @@ -1865,32 +1879,37 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void run() { + + int journalToAdvance = -1; + Set<Integer> journalLogsReferenced = new HashSet<Integer>(); + // Lock index to capture the ackMessageFileMap data indexLock.writeLock().lock(); - // Map keys might not be sorted, find the earliest log file to forward acks - // from and move only those, future cycles can chip away at more as needed. - // We won't move files that are themselves rewritten on a previous compaction. - List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); - Collections.sort(journalFileIds); - int journalToAdvance = -1; - for (Integer journalFileId : journalFileIds) { - DataFile current = journal.getDataFileById(journalFileId); - if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { - journalToAdvance = journalFileId; - break; + try { + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; + } } - } - // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { - return; - } + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } - Set<Integer> journalLogsReferenced = - new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance)); + journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); - indexLock.writeLock().unlock(); + } finally { + indexLock.writeLock().unlock(); + } try { // Background rewrite of the old acks