http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 9f6488e..3c0dab7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -120,6 +120,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { // QueueControlMBean implementation ------------------------------ + @Override public String getName() { clearIO(); try { @@ -130,12 +131,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String getAddress() { checkStarted(); return address; } + @Override public String getFilter() { checkStarted(); @@ -150,6 +153,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean isDurable() { checkStarted(); @@ -162,6 +166,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean isTemporary() { checkStarted(); @@ -174,6 +179,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public long getMessageCount() { checkStarted(); @@ -186,6 +192,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int getConsumerCount() { checkStarted(); @@ -198,6 +205,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int getDeliveringCount() { checkStarted(); @@ -210,6 +218,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public long getMessagesAdded() { checkStarted(); @@ -222,6 +231,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public long getMessagesAcknowledged() { checkStarted(); @@ -234,6 +244,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public long getID() { checkStarted(); @@ -246,6 +257,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public long getScheduledCount() { checkStarted(); @@ -258,6 +270,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String getDeadLetterAddress() { checkStarted(); @@ -275,6 +288,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String getExpiryAddress() { checkStarted(); @@ -294,6 +308,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public Map<String, Object>[] listScheduledMessages() throws Exception { checkStarted(); @@ -307,6 +322,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listScheduledMessagesAsJSON() throws Exception { checkStarted(); @@ -333,6 +349,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return messages; } + @Override public Map<String, Map<String, Object>[]> listDeliveringMessages() { checkStarted(); @@ -353,6 +370,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override public String listDeliveringMessagesAsJSON() throws Exception { checkStarted(); @@ -365,6 +383,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public Map<String, Object>[] listMessages(final String filterStr) throws Exception { checkStarted(); @@ -396,6 +415,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listMessagesAsJSON(final String filter) throws Exception { checkStarted(); @@ -435,10 +455,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override public String getFirstMessageAsJSON() throws Exception { return toJSON(getFirstMessage()).toString(); } + @Override public Long getFirstMessageTimestamp() throws Exception { Map<String, Object>[] _message = getFirstMessage(); if (_message == null || _message.length == 0 || _message[0] == null) { @@ -451,6 +473,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return (Long) message.get("timestamp"); } + @Override public Long getFirstMessageAge() throws Exception { Long firstMessageTimestamp = getFirstMessageTimestamp(); if (firstMessageTimestamp == null) { @@ -460,6 +483,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return now - firstMessageTimestamp.longValue(); } + @Override public long countMessages(final String filterStr) throws Exception { checkStarted(); @@ -491,6 +515,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean removeMessage(final long messageID) throws Exception { checkStarted(); @@ -506,10 +531,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int removeMessages(final String filterStr) throws Exception { return removeMessages(FLUSH_LIMIT, filterStr); } + @Override public int removeMessages(final int flushLimit, final String filterStr) throws Exception { checkStarted(); @@ -524,6 +551,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean expireMessage(final long messageID) throws Exception { checkStarted(); @@ -536,6 +564,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int expireMessages(final String filterStr) throws Exception { checkStarted(); @@ -552,6 +581,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean retryMessage(final long messageID) throws Exception { checkStarted(); @@ -577,6 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int retryMessages() throws Exception { checkStarted(); clearIO(); @@ -589,10 +620,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception { return moveMessage(messageID, otherQueueName, false); } + @Override public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception { @@ -614,10 +647,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override public int moveMessages(final String filterStr, final String otherQueueName) throws Exception { return moveMessages(filterStr, otherQueueName, false); } + @Override public int moveMessages(final int flushLimit, final String filterStr, final String otherQueueName, @@ -644,12 +679,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception { return moveMessages(FLUSH_LIMIT, filterStr, otherQueueName, rejectDuplicates); } + @Override public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception { checkStarted(); @@ -664,6 +701,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { checkStarted(); @@ -676,6 +714,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception { checkStarted(); @@ -693,6 +732,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception { checkStarted(); @@ -708,6 +748,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listMessageCounter() { checkStarted(); @@ -723,6 +764,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public void resetMessageCounter() { checkStarted(); @@ -735,6 +777,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listMessageCounterAsHTML() { checkStarted(); @@ -747,6 +790,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listMessageCounterHistory() throws Exception { checkStarted(); @@ -759,6 +803,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public String listMessageCounterHistoryAsHTML() { checkStarted(); @@ -771,6 +816,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public void pause() { checkStarted(); @@ -783,6 +829,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public void resume() { checkStarted(); @@ -795,6 +842,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public boolean isPaused() throws Exception { checkStarted(); @@ -807,6 +855,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override public void flushExecutor() { checkStarted(); @@ -858,6 +907,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return MBeanInfoHelper.getMBeanOperationsInfo(QueueControl.class); } + @Override public void resetMessagesAdded() throws Exception { checkStarted(); @@ -871,6 +921,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override public void resetMessagesAcknowledged() throws Exception { checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java index dce7e5e..f3ea998 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java @@ -104,6 +104,7 @@ public class MessageCounter { } private final Runnable onTimeExecutor = new Runnable() { + @Override public void run() { long latestMessagesAdded = serverQueue.getMessagesAdded(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java index c81aa65..ff268c9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java @@ -58,6 +58,7 @@ public class MessageCounterManagerImpl implements MessageCounterManager { this.scheduledThreadPool = scheduledThreadPool; } + @Override public synchronized void start() { if (started) { return; @@ -71,6 +72,7 @@ public class MessageCounterManagerImpl implements MessageCounterManager { started = true; } + @Override public synchronized void stop() { if (!started) { return; @@ -81,10 +83,12 @@ public class MessageCounterManagerImpl implements MessageCounterManager { started = false; } + @Override public synchronized void clear() { messageCounters.clear(); } + @Override public synchronized void reschedule(final long newPeriod) { boolean wasStarted = started; @@ -99,24 +103,29 @@ public class MessageCounterManagerImpl implements MessageCounterManager { } } + @Override public long getSamplePeriod() { return period; } + @Override public int getMaxDayCount() { return maxDayCount; } + @Override public void setMaxDayCount(final int count) { maxDayCount = count; } + @Override public void registerMessageCounter(final String name, final MessageCounter counter) { synchronized (messageCounters) { messageCounters.put(name, counter); } } + @Override public MessageCounter unregisterMessageCounter(final String name) { synchronized (messageCounters) { return messageCounters.remove(name); @@ -129,6 +138,7 @@ public class MessageCounterManagerImpl implements MessageCounterManager { } } + @Override public void resetAllCounters() { synchronized (messageCounters) { Iterator<MessageCounter> iter = messageCounters.values().iterator(); @@ -141,6 +151,7 @@ public class MessageCounterManagerImpl implements MessageCounterManager { } } + @Override public void resetAllCounterHistories() { synchronized (messageCounters) { Iterator<MessageCounter> iter = messageCounters.values().iterator(); @@ -159,6 +170,7 @@ public class MessageCounterManagerImpl implements MessageCounterManager { private Future<?> future; + @Override public synchronized void run() { if (closed) { return; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java index 0c6cf91..2c82974 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java @@ -32,6 +32,7 @@ public interface PageCache extends SoftValueHashMap.ValueCache { /** * @return whether this cache is still being updated */ + @Override boolean isLive(); /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index cfbe8e8..f132723 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -48,10 +48,12 @@ public class PagedReferenceImpl implements PagedReference { private boolean alreadyAcked; + @Override public ServerMessage getMessage() { return getPagedMessage().getMessage(); } + @Override public synchronized PagedMessage getPagedMessage() { PagedMessage returnMessage = message != null ? message.get() : null; @@ -67,6 +69,7 @@ public class PagedReferenceImpl implements PagedReference { return returnMessage; } + @Override public PagePosition getPosition() { return position; } @@ -86,14 +89,17 @@ public class PagedReferenceImpl implements PagedReference { this.subscription = subscription; } + @Override public boolean isPaged() { return true; } + @Override public void setPersistedCount(int count) { this.persistedCount = count; } + @Override public int getPersistedCount() { return persistedCount; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java index f964d42..5efe6d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java @@ -64,22 +64,27 @@ class PageCacheImpl implements PageCache { } } + @Override public long getPageId() { return page.getPageId(); } + @Override public void lock() { lock.writeLock().lock(); } + @Override public void unlock() { lock.writeLock().unlock(); } + @Override public void setMessages(final PagedMessage[] messages) { this.messages = messages; } + @Override public int getNumberOfMessages() { lock.readLock().lock(); try { @@ -90,6 +95,7 @@ class PageCacheImpl implements PageCache { } } + @Override public void close() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 437cd96..bd8dda5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -88,6 +88,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { // Public -------------------------------------------------------- + @Override public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) { if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { ActiveMQServerLogger.LOGGER.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception("trace")); @@ -102,10 +103,12 @@ public class PageCursorProviderImpl implements PageCursorProvider { return activeCursor; } + @Override public synchronized PageSubscription getSubscription(long cursorID) { return activeCursors.get(cursorID); } + @Override public PagedMessage getMessage(final PagePosition pos) { PageCache cache = getPageCache(pos.getPageNr()); @@ -117,12 +120,14 @@ public class PageCursorProviderImpl implements PageCursorProvider { return cache.getMessage(pos.getMessageNr()); } + @Override public PagedReference newReference(final PagePosition pos, final PagedMessage msg, final PageSubscription subscription) { return new PagedReferenceImpl(pos, msg, subscription); } + @Override public PageCache getPageCache(final long pageId) { try { boolean needToRead = false; @@ -183,16 +188,19 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void addPageCache(PageCache cache) { synchronized (softCache) { softCache.put(cache.getPageId(), cache); } } + @Override public void setCacheMaxSize(final int size) { softCache.setMaxElements(size); } + @Override public int getCacheSize() { synchronized (softCache) { return softCache.size(); @@ -205,6 +213,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void processReload() throws Exception { Collection<PageSubscription> cursorList = this.activeCursors.values(); for (PageSubscription cursor : cursorList) { @@ -231,6 +240,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } + @Override public void stop() { for (PageSubscription cursor : activeCursors.values()) { cursor.stop(); @@ -249,6 +259,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void flushExecutors() { for (PageSubscription cursor : activeCursors.values()) { cursor.flushExecutors(); @@ -256,6 +267,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { waitForFuture(); } + @Override public void close(PageSubscription cursor) { activeCursors.remove(cursor.getId()); @@ -274,6 +286,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { scheduledCleanup.incrementAndGet(); executor.execute(new Runnable() { + @Override public void run() { storageManager.setContext(storageManager.newSingleThreadContext()); try { @@ -293,6 +306,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { * Hence the PagingStore will be holding a write lock, meaning no messages are going to be paged at this time. * So, we shouldn't lock anything after this method, to avoid dead locks between the writeLock and any synchronization with the CursorProvider. */ + @Override public void onPageModeCleared() { ArrayList<PageSubscription> subscriptions = cloneSubscriptions(); @@ -314,14 +328,17 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void disableCleanup() { this.cleanupEnabled = false; } + @Override public void resumeCleanup() { this.cleanupEnabled = true; } + @Override public void cleanup() { ArrayList<Page> depagedPages = new ArrayList<Page>(); @@ -524,6 +541,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void printDebug() { System.out.println("Debug information for PageCursorProviderImpl:"); for (PageCache cache : softCache.values()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java index 51dda9d..fbf3bd6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java @@ -53,6 +53,7 @@ public class PagePositionImpl implements PagePosition { /** * @return the recordID */ + @Override public long getRecordID() { return recordID; } @@ -60,6 +61,7 @@ public class PagePositionImpl implements PagePosition { /** * @param recordID the recordID to set */ + @Override public void setRecordID(long recordID) { this.recordID = recordID; } @@ -67,6 +69,7 @@ public class PagePositionImpl implements PagePosition { /** * @return the pageNr */ + @Override public long getPageNr() { return pageNr; } @@ -74,6 +77,7 @@ public class PagePositionImpl implements PagePosition { /** * @return the messageNr */ + @Override public int getMessageNr() { return messageNr; } @@ -97,10 +101,12 @@ public class PagePositionImpl implements PagePosition { } } + @Override public PagePosition nextMessage() { return new PagePositionImpl(this.pageNr, this.messageNr + 1); } + @Override public PagePosition nextPage() { return new PagePositionImpl(this.pageNr + 1, 0); } @@ -140,6 +146,7 @@ public class PagePositionImpl implements PagePosition { * There is a rule for finalizing it where I'm establishing a counter, and that rule won't work without this method defined. * So, please don't remove it unless you had to remove that test for any weird reason.. it's here for a purpose! */ + @Override protected void finalize() { } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index 5a0d94f..c879564 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -73,6 +73,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { private LinkedList<Pair<Long, Integer>> loadList; private final Runnable cleanupCheck = new Runnable() { + @Override public void run() { cleanup(); } @@ -131,6 +132,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { * * @param pageID */ + @Override public void cleanupNonTXCounters(final long pageID) throws Exception { Pair<Long, AtomicInteger> pendingInfo; synchronized (this) { @@ -186,6 +188,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { * @param recordID1 * @param add */ + @Override public void applyIncrementOnTX(Transaction tx, long recordID1, int add) { CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC); @@ -198,6 +201,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { oper.operations.add(new ItemOper(this, recordID1, add)); } + @Override public synchronized void loadValue(final long recordID1, final long value1) { if (this.subscription != null) { // it could be null on testcases... which is ok @@ -215,6 +219,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } + @Override public void delete() throws Exception { Transaction tx = new TransactionImpl(storage); @@ -223,6 +228,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { tx.commit(); } + @Override public void delete(Transaction tx) throws Exception { // always lock the StorageManager first. storage.readLock(); @@ -248,6 +254,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } } + @Override public void loadInc(long id, int add) { if (loadList == null) { loadList = new LinkedList<Pair<Long, Integer>>(); @@ -256,6 +263,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { loadList.add(new Pair<Long, Integer>(id, add)); } + @Override public void processReload() { if (loadList != null) { if (subscription != null) { @@ -272,6 +280,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } } + @Override public synchronized void addInc(long id, int variance) { value.addAndGet(variance); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 2bb7cd9..08bcdfc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -113,26 +113,32 @@ final class PageSubscriptionImpl implements PageSubscription { // Public -------------------------------------------------------- + @Override public PagingStore getPagingStore() { return pageStore; } + @Override public Queue getQueue() { return queue; } + @Override public boolean isPaging() { return pageStore.isPaging(); } + @Override public void setQueue(Queue queue) { this.queue = queue; } + @Override public void disableAutoCleanup() { autoCleanup = false; } + @Override public void enableAutoCleanup() { autoCleanup = true; } @@ -141,6 +147,7 @@ final class PageSubscriptionImpl implements PageSubscription { return cursorProvider; } + @Override public void notEmpty() { synchronized (consumedPages) { this.empty = false; @@ -148,6 +155,7 @@ final class PageSubscriptionImpl implements PageSubscription { } + @Override public void bookmark(PagePosition position) throws Exception { PageCursorInfo cursorInfo = getPageInfo(position); @@ -158,6 +166,7 @@ final class PageSubscriptionImpl implements PageSubscription { confirmPosition(position); } + @Override public long getMessageCount() { if (empty) { return 0; @@ -167,6 +176,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public PageSubscriptionCounter getCounter() { return counter; } @@ -178,6 +188,7 @@ final class PageSubscriptionImpl implements PageSubscription { * TX) we may have big holes on the page streaming, and we will need to ignore such pages on the * cursor/subscription. */ + @Override public void reloadPageCompletion(PagePosition position) { PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null); info.setCompleteInfo(position); @@ -186,6 +197,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void scheduleCleanupCheck() { if (autoCleanup) { if (scheduledCleanupCount.get() > 2) { @@ -195,6 +207,7 @@ final class PageSubscriptionImpl implements PageSubscription { scheduledCleanupCount.incrementAndGet(); executor.execute(new Runnable() { + @Override public void run() { try { cleanupEntries(false); @@ -210,6 +223,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void onPageModeCleared(Transaction tx) throws Exception { if (counter != null) { // this could be null on testcases @@ -221,6 +235,7 @@ final class PageSubscriptionImpl implements PageSubscription { /** * It will cleanup all the records for completed pages */ + @Override public void cleanupEntries(final boolean completeDelete) throws Exception { if (completeDelete) { counter.delete(); @@ -298,6 +313,7 @@ final class PageSubscriptionImpl implements PageSubscription { public void afterCommit(final Transaction tx1) { executor.execute(new Runnable() { + @Override public void run() { if (!completeDelete) { cursorProvider.scheduleCleanup(); @@ -390,6 +406,7 @@ final class PageSubscriptionImpl implements PageSubscription { return new PagePositionImpl(pageStore.getFirstPage(), -1); } + @Override public void confirmPosition(final Transaction tx, final PagePosition position) throws Exception { // if the cursor is persistent if (persistent) { @@ -399,6 +416,7 @@ final class PageSubscriptionImpl implements PageSubscription { } + @Override public void ackTx(final Transaction tx, final PagedReference reference) throws Exception { confirmPosition(tx, reference.getPosition()); @@ -418,6 +436,7 @@ final class PageSubscriptionImpl implements PageSubscription { tx.commit(); } + @Override public boolean contains(PagedReference ref) throws Exception { // We first verify if the message was routed to this queue boolean routed = false; @@ -437,6 +456,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void confirmPosition(final PagePosition position) throws Exception { // if we are dealing with a persistent cursor if (persistent) { @@ -482,6 +502,7 @@ final class PageSubscriptionImpl implements PageSubscription { } + @Override public void addPendingDelivery(final PagePosition position) { getPageInfo(position).incrementPendingTX(); } @@ -514,6 +535,7 @@ final class PageSubscriptionImpl implements PageSubscription { /** * Theres no need to synchronize this method as it's only called from journal load on startup */ + @Override public void reloadACK(final PagePosition position) { if (recoveredACK == null) { recoveredACK = new LinkedList<PagePosition>(); @@ -533,6 +555,7 @@ final class PageSubscriptionImpl implements PageSubscription { processACK(position); } + @Override public void lateDeliveryRollback(PagePosition position) { PageCursorInfo cursorInfo = processACK(position); cursorInfo.decrementPendingTX(); @@ -559,6 +582,7 @@ final class PageSubscriptionImpl implements PageSubscription { /** * All the data associated with the cursor should go away here */ + @Override public void destroy() throws Exception { final long tx = store.generateID(); try { @@ -602,10 +626,12 @@ final class PageSubscriptionImpl implements PageSubscription { return cursorId; } + @Override public boolean isPersistent() { return persistent; } + @Override public void processReload() throws Exception { if (recoveredACK != null) { if (isTrace) { @@ -640,6 +666,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void flushExecutors() { FutureLatch future = new FutureLatch(); executor.execute(future); @@ -648,10 +675,12 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void stop() { flushExecutors(); } + @Override public void printDebug() { printDebug(toString()); } @@ -663,6 +692,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public void onDeletePage(Page deletedPage) throws Exception { PageCursorInfo info; synchronized (consumedPages) { @@ -693,10 +723,12 @@ final class PageSubscriptionImpl implements PageSubscription { } } + @Override public Executor getExecutor() { return executor; } + @Override public void reloadPageInfo(long pageNr) { getPageInfo(pageNr, true); } @@ -1052,12 +1084,14 @@ final class PageSubscriptionImpl implements PageSubscription { public CursorIterator() { } + @Override public void redeliver(PagePosition reference) { synchronized (redeliveries) { redeliveries.add(reference); } } + @Override public void repeat() { if (isredelivery) { synchronized (redeliveries) { @@ -1206,6 +1240,7 @@ final class PageSubscriptionImpl implements PageSubscription { * QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. * It would be a rare race condition but I would prefer avoiding that scenario */ + @Override public synchronized boolean hasNext() { // if an unbehaved program called hasNext twice before next, we only cache it once. if (cachedNext != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 1bf9f2a..dca0091 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -291,6 +291,7 @@ public final class Page implements Comparable<Page> { return "Page::pageID=" + this.pageId + ", file=" + this.file; } + @Override public int compareTo(Page otherPage) { return otherPage.getPageId() - this.pageId; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java index a8d24c7..614f201 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java @@ -43,6 +43,7 @@ final class PageSyncTimer { private final long timeSync; private final Runnable runnable = new Runnable() { + @Override public void run() { tick(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index f6990d6..85c4489 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -70,18 +70,22 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { // Public -------------------------------------------------------- + @Override public long getRecordID() { return recordID; } + @Override public void setRecordID(final long recordID) { this.recordID = recordID; } + @Override public long getTransactionID() { return transactionID; } + @Override public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) { int sizeAfterUpdate = numberOfMessages.addAndGet(-update); if (sizeAfterUpdate == 0 && storageManager != null) { @@ -96,17 +100,20 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { } } + @Override public void increment(final int durableSize, final int nonDurableSize) { numberOfPersistentMessages.addAndGet(durableSize); numberOfMessages.addAndGet(durableSize + nonDurableSize); } + @Override public int getNumberOfMessages() { return numberOfMessages.get(); } // EncodingSupport implementation + @Override public synchronized void decode(final ActiveMQBuffer buffer) { transactionID = buffer.readLong(); numberOfMessages.set(buffer.readInt()); @@ -114,15 +121,18 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { committed = true; } + @Override public synchronized void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); buffer.writeInt(numberOfPersistentMessages.get()); } + @Override public synchronized int getEncodeSize() { return DataConstants.SIZE_LONG + DataConstants.SIZE_INT; } + @Override public synchronized void commit() { if (lateDeliveries != null) { // This is to make sure deliveries that were touched before the commit arrived will be delivered @@ -135,6 +145,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { lateDeliveries = null; } + @Override public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception { @@ -146,12 +157,14 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { * (non-Javadoc) * @see org.apache.activemq.artemis.core.paging.PageTransactionInfo#storeUpdate(org.apache.activemq.artemis.core.persistence.StorageManager, org.apache.activemq.artemis.core.transaction.Transaction, int) */ + @Override public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception { internalUpdatePageManager(storageManager, pagingManager, tx, 1); } + @Override public void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, @@ -184,18 +197,22 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { return pgtxUpdate; } + @Override public boolean isCommit() { return committed; } + @Override public void setCommitted(final boolean committed) { this.committed = committed; } + @Override public boolean isRollback() { return rolledback; } + @Override public synchronized void rollback() { rolledback = true; committed = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index 9b5b0fd..b4e6f38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -57,10 +57,12 @@ public class PagedMessageImpl implements PagedMessage { public PagedMessageImpl() { } + @Override public ServerMessage getMessage() { return message; } + @Override public void initMessage(StorageManager storage) { if (largeMessageLazyData != null) { LargeServerMessage lgMessage = storage.createLargeMessage(); @@ -73,16 +75,19 @@ public class PagedMessageImpl implements PagedMessage { } } + @Override public long getTransactionID() { return transactionID; } + @Override public long[] getQueueIDs() { return queueIDs; } // EncodingSupport implementation -------------------------------- + @Override public void decode(final ActiveMQBuffer buffer) { transactionID = buffer.readLong(); @@ -112,6 +117,7 @@ public class PagedMessageImpl implements PagedMessage { } } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); @@ -128,6 +134,7 @@ public class PagedMessageImpl implements PagedMessage { } } + @Override public int getEncodeSize() { return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 15f01f6..ce43c34 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -81,6 +81,7 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public void disableCleanup() { if (!cleanupEnabled) { return; @@ -98,6 +99,7 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public void resumeCleanup() { if (cleanupEnabled) { return; @@ -115,11 +117,13 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public SimpleString[] getStoreNames() { Set<SimpleString> names = stores.keySet(); return names.toArray(new SimpleString[names.size()]); } + @Override public void reloadStores() throws Exception { lock(); try { @@ -143,6 +147,7 @@ public final class PagingManagerImpl implements PagingManager { } + @Override public void deletePageStore(final SimpleString storeName) throws Exception { syncLock.readLock().lock(); try { @@ -159,6 +164,7 @@ public final class PagingManagerImpl implements PagingManager { /** * stores is a ConcurrentHashMap, so we don't need to synchronize this method */ + @Override public PagingStore getPageStore(final SimpleString storeName) throws Exception { PagingStore store = stores.get(storeName); @@ -168,6 +174,7 @@ public final class PagingManagerImpl implements PagingManager { return newStore(storeName); } + @Override public void addTransaction(final PageTransactionInfo pageTransaction) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("Adding pageTransaction " + pageTransaction.getTransactionID()); @@ -175,6 +182,7 @@ public final class PagingManagerImpl implements PagingManager { transactions.put(pageTransaction.getTransactionID(), pageTransaction); } + @Override public void removeTransaction(final long id) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("Removing pageTransaction " + id); @@ -182,6 +190,7 @@ public final class PagingManagerImpl implements PagingManager { transactions.remove(id); } + @Override public PageTransactionInfo getTransaction(final long id) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("looking up pageTX = " + id); @@ -218,6 +227,7 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public synchronized void stop() throws Exception { if (!started) { return; @@ -238,6 +248,7 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public void processReload() throws Exception { for (PagingStore store : stores.values()) { store.processReload(); @@ -263,10 +274,12 @@ public final class PagingManagerImpl implements PagingManager { } } + @Override public void unlock() { syncLock.writeLock().unlock(); } + @Override public void lock() { syncLock.writeLock().lock(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index ce42860..b6a14f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -87,14 +87,17 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { // Public -------------------------------------------------------- + @Override public void stop() { } + @Override public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) { return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); } + @Override public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception { String guid = UUIDGenerator.getInstance().generateStringUUID(); @@ -120,10 +123,12 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { return factory; } + @Override public void setPagingManager(final PagingManager pagingManager) { this.pagingManager = pagingManager; } + @Override public List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception { File[] files = directory.listFiles(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5824664..f224fd1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -179,6 +179,7 @@ public class PagingStoreImpl implements PagingStore { /** * @param addressSettings */ + @Override public void applySetting(final AddressSettings addressSettings) { maxSize = addressSettings.getMaxSizeBytes(); @@ -210,38 +211,47 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void unlock() { lock.writeLock().unlock(); } + @Override public PageCursorProvider getCursorProvider() { return cursorProvider; } + @Override public long getFirstPage() { return firstPageId; } + @Override public SimpleString getAddress() { return address; } + @Override public long getAddressSize() { return sizeInBytes.get(); } + @Override public long getMaxSize() { return maxSize; } + @Override public AddressFullMessagePolicy getAddressFullMessagePolicy() { return addressFullMessagePolicy; } + @Override public long getPageSizeBytes() { return pageSize; } + @Override public File getFolder() { SequentialFileFactory factoryUsed = this.fileFactory; if (factoryUsed != null) { @@ -252,6 +262,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public boolean isPaging() { lock.readLock().lock(); @@ -272,18 +283,22 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public int getNumberOfPages() { return numberOfPages; } + @Override public int getCurrentWritingPage() { return currentPageId; } + @Override public SimpleString getStoreName() { return storeName; } + @Override public void sync() throws Exception { if (syncTimer != null) { syncTimer.addSync(storageManager.getContext()); @@ -294,6 +309,7 @@ public class PagingStoreImpl implements PagingStore { } + @Override public void ioSync() throws Exception { lock.readLock().lock(); @@ -307,10 +323,12 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void processReload() throws Exception { cursorProvider.processReload(); } + @Override public PagingManager getPagingManager() { return pagingManager; } @@ -336,6 +354,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void flushExecutors() { cursorProvider.flushExecutors(); @@ -427,6 +446,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void stopPaging() { lock.writeLock().lock(); try { @@ -438,6 +458,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public boolean startPaging() { if (!running) { return false; @@ -490,16 +511,19 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public Page getCurrentPage() { return currentPage; } + @Override public boolean checkPageFileExists(final int pageNumber) { String fileName = createFileName(pageNumber); SequentialFile file = fileFactory.createSequentialFile(fileName); return file.exists(); } + @Override public Page createPage(final int pageNumber) throws Exception { String fileName = createFileName(pageNumber); @@ -521,6 +545,7 @@ public class PagingStoreImpl implements PagingStore { return page; } + @Override public void forceAnotherPage() throws Exception { openNewPage(); } @@ -537,6 +562,7 @@ public class PagingStoreImpl implements PagingStore { * externally is used only on tests, and that's why this method is part of the Testable Interface * </p> */ + @Override public Page depage() throws Exception { lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging try { @@ -600,6 +626,7 @@ public class PagingStoreImpl implements PagingStore { private class MemoryFreedRunnablesExecutor implements Runnable { + @Override public void run() { Runnable runnable; @@ -621,6 +648,7 @@ public class PagingStoreImpl implements PagingStore { this.runnable = runnable; } + @Override public synchronized void run() { if (!ran) { runnable.run(); @@ -630,6 +658,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public boolean checkMemory(final Runnable runWhenAvailable) { if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1) { if (sizeInBytes.get() > maxSize) { @@ -664,6 +693,7 @@ public class PagingStoreImpl implements PagingStore { return true; } + @Override public void addSize(final int size) { if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { if (maxSize != -1) { @@ -817,6 +847,7 @@ public class PagingStoreImpl implements PagingStore { /** * This method will disable cleanup of pages. No page will be deleted after this call. */ + @Override public void disableCleanup() { getCursorProvider().disableCleanup(); } @@ -824,6 +855,7 @@ public class PagingStoreImpl implements PagingStore { /** * This method will re-enable cleanup of pages. Notice that it will also start cleanup threads. */ + @Override public void enableCleanup() { getCursorProvider().resumeCleanup(); } @@ -913,6 +945,7 @@ public class PagingStoreImpl implements PagingStore { this.pagingManager = pagingManager; } + @Override public void afterCommit(final Transaction tx) { // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the // transaction until all the messages were added to the queue @@ -923,15 +956,18 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void afterPrepare(final Transaction tx) { } + @Override public void afterRollback(final Transaction tx) { if (pageTransaction != null) { pageTransaction.rollback(); } } + @Override public void beforeCommit(final Transaction tx) throws Exception { syncStore(); storePageTX(tx); @@ -946,6 +982,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void beforePrepare(final Transaction tx) throws Exception { syncStore(); storePageTX(tx); @@ -959,6 +996,7 @@ public class PagingStoreImpl implements PagingStore { } } + @Override public void beforeRollback(final Transaction tx) throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java index a8bb7cb..346bb40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java @@ -100,6 +100,7 @@ public final class BatchingIDGenerator implements IDGenerator { } + @Override public long generateID() { long id = counter.getAndIncrement(); @@ -109,6 +110,7 @@ public final class BatchingIDGenerator implements IDGenerator { return id; } + @Override public long getCurrentID() { return counter.get(); } @@ -178,14 +180,17 @@ public final class BatchingIDGenerator implements IDGenerator { IDCounterEncoding() { } + @Override public void decode(final ActiveMQBuffer buffer) { id = buffer.readLong(); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(id); } + @Override public int getEncodeSize() { return DataConstants.SIZE_LONG; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index d8b6fa9..14bee91 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -143,20 +143,24 @@ public final class DescribeJournal { JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() { + @Override public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo)); checkRecordCounter(recordInfo); } + @Override public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception { out.println("operation@Update;" + describeRecord(recordInfo)); checkRecordCounter(recordInfo); } + @Override public void onReadRollbackRecord(final long transactionID) throws Exception { out.println("operation@Rollback;txID=" + transactionID); } + @Override public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception { @@ -164,26 +168,32 @@ public final class DescribeJournal { ",extraData=" + encode(extraData) + ", xid=" + toXid(extraData)); } + @Override public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo)); } + @Override public void onReadDeleteRecord(final long recordID) throws Exception { out.println("operation@DeleteRecord;recordID=" + recordID); } + @Override public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { out.println("operation@Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords); } + @Override public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo)); } + @Override public void onReadAddRecord(final RecordInfo recordInfo) throws Exception { out.println("operation@AddRecord;" + describeRecord(recordInfo)); } + @Override public void markAsDataFile(final JournalFile file1) { } @@ -252,6 +262,7 @@ public final class DescribeJournal { Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>(); journal.load(records, preparedTransactions, new TransactionFailureCallback() { + @Override public void failedTransaction(long transactionID, List<RecordInfo> records1, List<RecordInfo> recordsToDelete) {