Repository: activemq Updated Branches: refs/heads/master b4e35fe8a -> 4d6cc4b46
https://issues.apache.org/jira/browse/AMQ-6303 Properly setting typeCode value for new journal files used for ack compaction Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4d6cc4b4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4d6cc4b4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4d6cc4b4 Branch: refs/heads/master Commit: 4d6cc4b46007c7bcec10ade080e822d83261273f Parents: b4e35fe Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Mon May 23 18:09:01 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Mon May 23 18:42:29 2016 +0000 ---------------------------------------------------------------------- .../apache/activemq/store/kahadb/MessageDatabase.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6cc4b4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 208a52b..e3918ab 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1375,12 +1375,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); - if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { - // Mark the current journal file as a compacted file so that gc checks can skip - // over logs that are smaller compaction type logs. - DataFile current = journal.getDataFileById(location.getDataFileId()); - current.setTypeCode(command.getRewriteType()); + // Mark the current journal file as a compacted file so that gc checks can skip + // over logs that are smaller compaction type logs. + DataFile current = journal.getDataFileById(location.getDataFileId()); + current.setTypeCode(command.getRewriteType()); + + if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { // Move offset so that next location read jumps to next file. location.setOffset(journalMaxFileLength); } @@ -1971,6 +1972,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); DataFile forwardsFile = journal.reserveDataFile(); + forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); @@ -1978,7 +1980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); compactionMarker.setSourceDataFileId(journalToRead); - compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE); + compactionMarker.setRewriteType(forwardsFile.getTypeCode()); ByteSequence payload = toByteSequence(compactionMarker); appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);