http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index e0ef75b..45a544d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -274,6 +274,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void clearContext() { OperationContextImpl.clearContext(); } @@ -632,10 +633,12 @@ public class JournalStorageManager implements StorageManager { } } + @Override public OperationContext getContext() { return OperationContextImpl.getContext(executorFactory); } + @Override public void setContext(final OperationContext context) { OperationContextImpl.setContext(context); } @@ -644,30 +647,37 @@ public class JournalStorageManager implements StorageManager { return singleThreadExecutor; } + @Override public OperationContext newSingleThreadContext() { return newContext(singleThreadExecutor); } + @Override public OperationContext newContext(final Executor executor1) { return new OperationContextImpl(executor1); } + @Override public void afterCompleteOperations(final IOCallback run) { getContext().executeOnCompletion(run); } + @Override public long generateID() { return idGenerator.generateID(); } + @Override public long getCurrentID() { return idGenerator.getCurrentID(); } + @Override public LargeServerMessage createLargeMessage() { return new LargeServerMessageImpl(this); } + @Override public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception { @@ -686,6 +696,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception { readLock(); try { @@ -729,6 +740,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception { readLock(); try { @@ -743,6 +755,7 @@ public class JournalStorageManager implements StorageManager { /** * We don't need messageID now but we are likely to need it we ever decide to support a database */ + @Override public void confirmPendingLargeMessage(long recordID) throws Exception { readLock(); try { @@ -753,6 +766,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeMessage(final ServerMessage message) throws Exception { if (message.getMessageID() <= 0) { // Sanity check only... this shouldn't happen unless there is a bug @@ -776,6 +790,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { readLock(); try { @@ -796,6 +811,7 @@ public class JournalStorageManager implements StorageManager { storageManagerLock.readLock().unlock(); } + @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { readLock(); try { @@ -806,6 +822,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { readLock(); try { @@ -818,6 +835,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteMessage(final long messageID) throws Exception { readLock(); try { @@ -832,6 +850,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); readLock(); @@ -843,6 +862,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception { readLock(); try { @@ -855,6 +875,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteDuplicateID(final long recordID) throws Exception { readLock(); try { @@ -867,6 +888,7 @@ public class JournalStorageManager implements StorageManager { // Transactional operations + @Override public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { if (message.getMessageID() <= 0) { throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); @@ -887,6 +909,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception { readLock(); try { @@ -898,6 +921,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception { @@ -910,6 +934,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception { readLock(); try { @@ -920,6 +945,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception { readLock(); try { @@ -930,6 +956,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception { @@ -942,6 +969,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { readLock(); try { @@ -954,16 +982,19 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { long recordID = idGenerator.generateID(); position.setRecordID(recordID); messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COMPLETE, new CursorAckRecordEncoding(queueID, position)); } + @Override public void deletePageComplete(long ackID) throws Exception { messageJournal.appendDeleteRecord(ackID, false); } + @Override public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { readLock(); try { @@ -974,10 +1005,12 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteCursorAcknowledge(long ackID) throws Exception { messageJournal.appendDeleteRecord(ackID, false); } + @Override public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception { readLock(); try { @@ -991,6 +1024,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteHeuristicCompletion(final long id) throws Exception { readLock(); try { @@ -1002,6 +1036,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deletePageTransactional(final long recordID) throws Exception { readLock(); try { @@ -1012,6 +1047,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); readLock(); @@ -1024,6 +1060,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void prepare(final long txID, final Xid xid) throws Exception { readLock(); try { @@ -1034,19 +1071,23 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void commit(final long txID) throws Exception { commit(txID, true); } + @Override public void commitBindings(final long txID) throws Exception { bindingsJournal.appendCommitRecord(txID, true); } + @Override public void rollbackBindings(final long txID) throws Exception { // no need to sync, it's going away anyways bindingsJournal.appendRollbackRecord(txID, false); } + @Override public void commit(final long txID, final boolean lineUpContext) throws Exception { readLock(); try { @@ -1067,6 +1108,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void rollback(final long txID) throws Exception { readLock(); try { @@ -1077,6 +1119,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeDuplicateIDTransactional(final long txID, final SimpleString address, final byte[] duplID, @@ -1092,6 +1135,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void updateDuplicateIDTransactional(final long txID, final SimpleString address, final byte[] duplID, @@ -1107,6 +1151,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception { readLock(); try { @@ -1119,6 +1164,7 @@ public class JournalStorageManager implements StorageManager { // Other operations + @Override public void updateDeliveryCount(final MessageReference ref) throws Exception { // no need to store if it's the same value // otherwise the journal will get OME in case of lots of redeliveries @@ -1138,6 +1184,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception { deleteAddressSetting(addressSetting.getAddressMatch()); readLock(); @@ -1152,16 +1199,19 @@ public class JournalStorageManager implements StorageManager { } } + @Override public List<PersistedAddressSetting> recoverAddressSettings() throws Exception { ArrayList<PersistedAddressSetting> list = new ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.values()); return list; } + @Override public List<PersistedRoles> recoverPersistedRoles() throws Exception { ArrayList<PersistedRoles> list = new ArrayList<PersistedRoles>(mapPersistedRoles.values()); return list; } + @Override public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception { deleteSecurityRoles(persistedRoles.getAddressMatch()); @@ -1199,6 +1249,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteAddressSetting(SimpleString addressMatch) throws Exception { PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); if (oldSetting != null) { @@ -1212,6 +1263,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteSecurityRoles(SimpleString addressMatch) throws Exception { PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch); if (oldRoles != null) { @@ -1607,6 +1659,7 @@ public class JournalStorageManager implements StorageManager { } // grouping handler operations + @Override public void addGrouping(final GroupBinding groupBinding) throws Exception { GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName()); readLock(); @@ -1618,6 +1671,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteGrouping(long tx, final GroupBinding groupBinding) throws Exception { readLock(); try { @@ -1630,6 +1684,7 @@ public class JournalStorageManager implements StorageManager { // BindingsImpl operations + @Override public void addQueueBinding(final long tx, final Binding binding) throws Exception { Queue queue = (Queue) binding.getBindable(); @@ -1648,6 +1703,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception { readLock(); try { @@ -1658,6 +1714,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public long storePageCounterInc(long txID, long queueID, int value) throws Exception { readLock(); try { @@ -1670,6 +1727,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public long storePageCounterInc(long queueID, int value) throws Exception { readLock(); try { @@ -1711,6 +1769,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deleteIncrementRecord(long txID, long recordID) throws Exception { readLock(); try { @@ -1721,6 +1780,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deletePageCounter(long txID, long recordID) throws Exception { readLock(); try { @@ -1731,6 +1791,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void deletePendingPageCounter(long txID, long recordID) throws Exception { readLock(); try { @@ -1741,6 +1802,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception { List<RecordInfo> records = new ArrayList<RecordInfo>(); @@ -1787,6 +1849,7 @@ public class JournalStorageManager implements StorageManager { return bindingsInfo; } + @Override public void lineUpContext() { readLock(); try { @@ -1800,6 +1863,7 @@ public class JournalStorageManager implements StorageManager { // ActiveMQComponent implementation // ------------------------------------------------------ + @Override public synchronized void start() throws Exception { if (started) { return; @@ -1827,6 +1891,7 @@ public class JournalStorageManager implements StorageManager { started = true; } + @Override public void stop() throws Exception { stop(false); } @@ -1839,6 +1904,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public synchronized void stop(boolean ioCriticalError) throws Exception { if (!started) { return; @@ -1888,6 +1954,7 @@ public class JournalStorageManager implements StorageManager { started = false; } + @Override public synchronized boolean isStarted() { return started; } @@ -1909,12 +1976,14 @@ public class JournalStorageManager implements StorageManager { } } + @Override public void beforePageRead() throws Exception { if (pageMaxConcurrentIO != null) { pageMaxConcurrentIO.acquire(); } } + @Override public void afterPageRead() throws Exception { if (pageMaxConcurrentIO != null) { pageMaxConcurrentIO.release(); @@ -1933,10 +2002,12 @@ public class JournalStorageManager implements StorageManager { // Public ----------------------------------------------------------------------------------- + @Override public Journal getMessageJournal() { return messageJournal; } + @Override public Journal getBindingsJournal() { return bindingsJournal; } @@ -1990,6 +2061,7 @@ public class JournalStorageManager implements StorageManager { } } Runnable deleteAction = new Runnable() { + @Override public void run() { try { readLock(); @@ -2030,6 +2102,7 @@ public class JournalStorageManager implements StorageManager { } } + @Override public SequentialFile createFileForLargeMessage(final long messageID, LargeMessageExtension extension) { return largeMessagesFactory.createSequentialFile(messageID + extension.getExtension()); } @@ -2303,37 +2376,47 @@ public class JournalStorageManager implements StorageManager { return DummyOperationContext.instance; } + @Override public void executeOnCompletion(final IOCallback runnable) { // There are no executeOnCompletion calls while using the DummyOperationContext // However we keep the code here for correctness runnable.done(); } + @Override public void replicationDone() { } + @Override public void replicationLineUp() { } + @Override public void storeLineUp() { } + @Override public void done() { } + @Override public void onError(final int errorCode, final String errorMessage) { } + @Override public void waitCompletion() { } + @Override public boolean waitCompletion(final long timeout) { return true; } + @Override public void pageSyncLineUp() { } + @Override public void pageSyncDone() { } } @@ -2353,14 +2436,17 @@ public class JournalStorageManager implements StorageManager { xid = XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data)); } + @Override public void decode(final ActiveMQBuffer buffer) { throw new IllegalStateException("Non Supported Operation"); } + @Override public void encode(final ActiveMQBuffer buffer) { XidCodecSupport.encodeXid(xid, buffer); } + @Override public int getEncodeSize() { return XidCodecSupport.getXidEncodeLength(xid); } @@ -2385,16 +2471,19 @@ public class JournalStorageManager implements StorageManager { HeuristicCompletionEncoding() { } + @Override public void decode(final ActiveMQBuffer buffer) { xid = XidCodecSupport.decodeXid(buffer); isCommit = buffer.readBoolean(); } + @Override public void encode(final ActiveMQBuffer buffer) { XidCodecSupport.encodeXid(xid, buffer); buffer.writeBoolean(isCommit); } + @Override public int getEncodeSize() { return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN; } @@ -2417,20 +2506,24 @@ public class JournalStorageManager implements StorageManager { public GroupingEncoding() { } + @Override public int getEncodeSize() { return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(groupId); buffer.writeSimpleString(clusterName); } + @Override public void decode(final ActiveMQBuffer buffer) { groupId = buffer.readSimpleString(); clusterName = buffer.readSimpleString(); } + @Override public long getId() { return id; } @@ -2439,10 +2532,12 @@ public class JournalStorageManager implements StorageManager { this.id = id; } + @Override public SimpleString getGroupId() { return groupId; } + @Override public SimpleString getClusterName() { return clusterName; } @@ -2498,6 +2593,7 @@ public class JournalStorageManager implements StorageManager { this.autoCreated = autoCreated; } + @Override public long getId() { return id; } @@ -2506,30 +2602,37 @@ public class JournalStorageManager implements StorageManager { this.id = id; } + @Override public SimpleString getAddress() { return address; } + @Override public void replaceQueueName(SimpleString newName) { this.name = newName; } + @Override public SimpleString getFilterString() { return filterString; } + @Override public SimpleString getQueueName() { return name; } + @Override public SimpleString getUser() { return user; } + @Override public boolean isAutoCreated() { return autoCreated; } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); address = buffer.readSimpleString(); @@ -2551,6 +2654,7 @@ public class JournalStorageManager implements StorageManager { autoCreated = buffer.readBoolean(); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(name); buffer.writeSimpleString(address); @@ -2559,6 +2663,7 @@ public class JournalStorageManager implements StorageManager { buffer.writeBoolean(autoCreated); } + @Override public int getEncodeSize() { return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN + @@ -2583,6 +2688,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) */ + @Override public void decode(final ActiveMQBuffer buffer) { message.decodeHeadersAndProperties(buffer); } @@ -2590,6 +2696,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) */ + @Override public void encode(final ActiveMQBuffer buffer) { message.encode(buffer); } @@ -2597,6 +2704,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() */ + @Override public int getEncodeSize() { return message.getEncodeSize(); } @@ -2617,6 +2725,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) */ + @Override public void decode(final ActiveMQBuffer buffer) { largeMessageID = buffer.readLong(); } @@ -2624,6 +2733,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) */ + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(largeMessageID); } @@ -2631,6 +2741,7 @@ public class JournalStorageManager implements StorageManager { /* (non-Javadoc) * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() */ + @Override public int getEncodeSize() { return DataConstants.SIZE_LONG; } @@ -2658,16 +2769,19 @@ public class JournalStorageManager implements StorageManager { this.count = count; } + @Override public void decode(final ActiveMQBuffer buffer) { queueID = buffer.readLong(); count = buffer.readInt(); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(queueID); buffer.writeInt(count); } + @Override public int getEncodeSize() { return 8 + 4; } @@ -2692,14 +2806,17 @@ public class JournalStorageManager implements StorageManager { super(); } + @Override public void decode(final ActiveMQBuffer buffer) { queueID = buffer.readLong(); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(queueID); } + @Override public int getEncodeSize() { return 8; } @@ -2779,6 +2896,7 @@ public class JournalStorageManager implements StorageManager { this.recods = records; } + @Override public void decode(ActiveMQBuffer buffer) { this.pageTX = buffer.readLong(); this.recods = buffer.readInt(); @@ -2850,6 +2968,7 @@ public class JournalStorageManager implements StorageManager { public DuplicateIDEncoding() { } + @Override public void decode(final ActiveMQBuffer buffer) { address = buffer.readSimpleString(); @@ -2860,6 +2979,7 @@ public class JournalStorageManager implements StorageManager { buffer.readBytes(duplID); } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(address); @@ -2868,6 +2988,7 @@ public class JournalStorageManager implements StorageManager { buffer.writeBytes(duplID); } + @Override public int getEncodeSize() { return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length; } @@ -3011,14 +3132,17 @@ public class JournalStorageManager implements StorageManager { this.id = id; } + @Override public long getID() { return id; } + @Override public long getQueueID() { return queueID; } + @Override public long getPageID() { return pageID; } @@ -3062,15 +3186,18 @@ public class JournalStorageManager implements StorageManager { int value; + @Override public int getEncodeSize() { return DataConstants.SIZE_LONG + DataConstants.SIZE_INT; } + @Override public void encode(ActiveMQBuffer buffer) { buffer.writeLong(queueID); buffer.writeInt(value); } + @Override public void decode(ActiveMQBuffer buffer) { queueID = buffer.readLong(); value = buffer.readInt(); @@ -3098,16 +3225,19 @@ public class JournalStorageManager implements StorageManager { public PagePosition position; + @Override public int getEncodeSize() { return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT; } + @Override public void encode(ActiveMQBuffer buffer) { buffer.writeLong(queueID); buffer.writeLong(position.getPageNr()); buffer.writeInt(position.getMessageNr()); } + @Override public void decode(ActiveMQBuffer buffer) { queueID = buffer.readLong(); long pageNR = buffer.readLong(); @@ -3125,6 +3255,7 @@ public class JournalStorageManager implements StorageManager { this.messages = messages; } + @Override public void failedTransaction(final long transactionID, final List<RecordInfo> records, final List<RecordInfo> recordsToDelete) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 719694d..906cbd3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -89,14 +89,17 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L /** * @param pendingRecordID */ + @Override public void setPendingRecordID(long pendingRecordID) { this.pendingRecordID = pendingRecordID; } + @Override public long getPendingRecordID() { return this.pendingRecordID; } + @Override public void setPaged() { paged = true; } @@ -150,6 +153,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L super.decodeHeadersAndProperties(buffer1); } + @Override public synchronized void incrementDelayDeletionCount() { delayDeletionCount.incrementAndGet(); try { @@ -160,6 +164,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } } + @Override public synchronized void decrementDelayDeletionCount() throws Exception { int count = delayDeletionCount.decrementAndGet(); @@ -233,6 +238,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L return memoryEstimate; } + @Override public synchronized void releaseResources() { if (file != null && file.isOpen()) { try { @@ -310,6 +316,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } } + @Override public SequentialFile getFile() throws ActiveMQException { validateFile(); return file; @@ -382,6 +389,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L private SequentialFile cFile; + @Override public void open() throws ActiveMQException { try { if (cFile != null && cFile.isOpen()) { @@ -395,6 +403,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } } + @Override public void close() throws ActiveMQException { try { if (cFile != null) { @@ -406,6 +415,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } } + @Override public int encode(final ByteBuffer bufferRead) throws ActiveMQException { try { return cFile.read(bufferRead); @@ -415,6 +425,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } } + @Override public int encode(final ActiveMQBuffer bufferOut, final int size) throws ActiveMQException { // This could maybe be optimized (maybe reading directly into bufferOut) ByteBuffer bufferRead = ByteBuffer.allocate(size); @@ -433,6 +444,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L /* (non-Javadoc) * @see org.apache.activemq.artemis.core.message.BodyEncoder#getLargeBodySize() */ + @Override public long getLargeBodySize() { if (bodySize < 0) { try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index fc49ada..caa71b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -98,28 +98,34 @@ public class OperationContextImpl implements OperationContext { this.executor = executor; } + @Override public void pageSyncLineUp() { pageLineUp.incrementAndGet(); } + @Override public synchronized void pageSyncDone() { paged++; checkTasks(); } + @Override public void storeLineUp() { storeLineUp.incrementAndGet(); } + @Override public void replicationLineUp() { replicationLineUp.incrementAndGet(); } + @Override public synchronized void replicationDone() { replicated++; checkTasks(); } + @Override public void executeOnCompletion(final IOCallback completion) { if (errorCode != -1) { completion.onError(errorCode, errorMessage); @@ -163,6 +169,7 @@ public class OperationContextImpl implements OperationContext { } + @Override public synchronized void done() { stored++; checkTasks(); @@ -194,6 +201,7 @@ public class OperationContextImpl implements OperationContext { executorsPending.incrementAndGet(); try { executor.execute(new Runnable() { + @Override public void run() { try { // If any IO is done inside the callback, it needs to be done on a new context http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 3eb14da..365954c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -76,6 +76,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe return "NullStorageLargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]"; } + @Override public ServerMessage copy() { // This is a simple copy, used only to avoid changing original properties return new NullStorageLargeServerMessage(this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 289cb77..381222f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -396,9 +396,11 @@ public class NullStorageManager implements StorageManager { public void deleteCursorAcknowledge(long ackID) throws Exception { } + @Override public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { } + @Override public void deletePageComplete(long ackID) throws Exception { } @@ -424,6 +426,7 @@ public class NullStorageManager implements StorageManager { public void deletePageCounter(final long txID, final long recordID) throws Exception { } + @Override public void deletePendingPageCounter(long txID, long recordID) throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java index bfa8527..357decd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java @@ -41,32 +41,39 @@ public class AddressImpl implements Address { containsWildCard = address.contains(WildcardAddressManager.SINGLE_WORD) || address.contains(WildcardAddressManager.ANY_WORDS); } + @Override public SimpleString getAddress() { return address; } + @Override public SimpleString[] getAddressParts() { return addressParts; } + @Override public boolean containsWildCard() { return containsWildCard; } + @Override public List<Address> getLinkedAddresses() { return linkedAddresses; } + @Override public void addLinkedAddress(final Address address) { if (!linkedAddresses.contains(address)) { linkedAddresses.add(address); } } + @Override public void removeLinkedAddress(final Address actualAddress) { linkedAddresses.remove(actualAddress); } + @Override public boolean matches(final Address add) { if (containsWildCard == add.containsWildCard()) { return address.equals(add.getAddress()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 4719971..995e041 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -73,20 +73,24 @@ public final class BindingsImpl implements Bindings { this.name = name; } + @Override public void setMessageLoadBalancingType(final MessageLoadBalancingType messageLoadBalancingType) { this.messageLoadBalancingType = messageLoadBalancingType; } + @Override public Collection<Binding> getBindings() { return bindingsMap.values(); } + @Override public void unproposed(SimpleString groupID) { for (Binding binding : bindingsMap.values()) { binding.unproposed(groupID); } } + @Override public void addBinding(final Binding binding) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("addBinding(" + binding + ") being called"); @@ -122,6 +126,7 @@ public final class BindingsImpl implements Bindings { } + @Override public void removeBinding(final Binding binding) { if (binding.isExclusive()) { exclusiveBindings.remove(binding); @@ -147,6 +152,7 @@ public final class BindingsImpl implements Bindings { } } + @Override public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception { @@ -227,6 +233,7 @@ public final class BindingsImpl implements Bindings { } } + @Override public void route(final ServerMessage message, final RoutingContext context) throws Exception { route(message, context, true); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java index 303b0fe..04f432d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java @@ -57,50 +57,62 @@ public class DivertBinding implements Binding { exclusive = divert.isExclusive(); } + @Override public long getID() { return id; } + @Override public Filter getFilter() { return filter; } + @Override public SimpleString getAddress() { return address; } + @Override public Bindable getBindable() { return divert; } + @Override public SimpleString getRoutingName() { return routingName; } + @Override public SimpleString getUniqueName() { return uniqueName; } + @Override public SimpleString getClusterName() { return uniqueName; } + @Override public boolean isExclusive() { return exclusive; } + @Override public boolean isHighAcceptPriority(final ServerMessage message) { return true; } + @Override public void route(final ServerMessage message, final RoutingContext context) throws Exception { divert.route(message, context); } + @Override public int getDistance() { return 0; } + @Override public BindingType getType() { return BindingType.DIVERT; } @@ -142,6 +154,7 @@ public class DivertBinding implements Binding { //noop } + @Override public void close() throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index d75dad6..9d97dee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -69,6 +69,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { this.persist = persist; } + @Override public void load(final List<Pair<byte[], Long>> theIds) throws Exception { int count = 0; @@ -108,6 +109,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } + @Override public void deleteFromCache(byte[] duplicateID) throws Exception { ByteArrayHolder bah = new ByteArrayHolder(duplicateID); @@ -129,10 +131,12 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } + @Override public boolean contains(final byte[] duplID) { return cache.get(new ByteArrayHolder(duplID)) != null; } + @Override public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception { long recordID = -1; @@ -158,6 +162,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } } + @Override public void load(final Transaction tx, final byte[] duplID) { tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID())); } @@ -212,6 +217,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } } + @Override public void clear() throws Exception { synchronized (this) { if (ids.size() > 0) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index cffd0c3..2a6d9c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -49,46 +49,57 @@ public class LocalQueueBinding implements QueueBinding { clusterName = name.concat(nodeID); } + @Override public long getID() { return queue.getID(); } + @Override public Filter getFilter() { return filter; } + @Override public SimpleString getAddress() { return address; } + @Override public Bindable getBindable() { return queue; } + @Override public Queue getQueue() { return queue; } + @Override public SimpleString getRoutingName() { return name; } + @Override public SimpleString getUniqueName() { return name; } + @Override public SimpleString getClusterName() { return clusterName; } + @Override public boolean isExclusive() { return false; } + @Override public int getDistance() { return 0; } + @Override public boolean isHighAcceptPriority(final ServerMessage message) { // It's a high accept priority if the queue has at least one matching consumer @@ -100,10 +111,12 @@ public class LocalQueueBinding implements QueueBinding { queue.unproposed(groupID); } + @Override public void route(final ServerMessage message, final RoutingContext context) throws Exception { queue.route(message, context); } + @Override public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { queue.routeWithAck(message, context); } @@ -112,14 +125,17 @@ public class LocalQueueBinding implements QueueBinding { return true; } + @Override public int consumerCount() { return queue.getConsumerCount(); } + @Override public BindingType getType() { return BindingType.LOCAL_QUEUE; } + @Override public void close() throws Exception { queue.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 8b2120d..6fcca35 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -171,6 +171,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // ActiveMQComponent implementation --------------------------------------- + @Override public synchronized void start() throws Exception { if (started) return; @@ -186,6 +187,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding started = true; } + @Override public synchronized void stop() throws Exception { started = false; @@ -205,12 +207,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding queueInfos.clear(); } + @Override public boolean isStarted() { return started; } // NotificationListener implementation ------------------------------------- + @Override public void onNotification(final Notification notification) { if (!(notification.getType() instanceof CoreNotificationType)) return; @@ -425,6 +429,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // Otherwise can have situation where createQueue comes in before failover, then failover occurs // and post office is activated but queue remains unactivated after failover so delivery never occurs // even though failover is complete + @Override public synchronized void addBinding(final Binding binding) throws Exception { addressManager.addBinding(binding); @@ -457,6 +462,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props)); } + @Override public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { addressSettingsRepository.clearCache(); @@ -530,6 +536,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return bindings != null && !bindings.getBindings().isEmpty(); } + @Override public Bindings getBindingsForAddress(final SimpleString address) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(address); @@ -540,26 +547,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return bindings; } + @Override public Bindings lookupBindingsForAddress(final SimpleString address) throws Exception { return addressManager.getBindingsForRoutingAddress(address); } + @Override public Binding getBinding(final SimpleString name) { return addressManager.getBinding(name); } + @Override public Bindings getMatchingBindings(final SimpleString address) throws Exception { return addressManager.getMatchingBindings(address); } + @Override public Map<SimpleString, Binding> getAllBindings() { return addressManager.getBindings(); } + @Override public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { route(message, queueCreator, (Transaction) null, direct); } + @Override public void route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, @@ -567,6 +580,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding route(message, queueCreator, new RoutingContextImpl(tx), direct); } + @Override public void route(final ServerMessage message, final QueueCreator queueCreator, final Transaction tx, @@ -575,6 +589,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); } + @Override public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, @@ -582,6 +597,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding route(message, queueCreator, context, direct, true); } + @Override public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, @@ -706,6 +722,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception { @@ -739,6 +756,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding /** * The redistribution can't process the route right away as we may be dealing with a large message which will need to be processed on a different thread */ + @Override public Pair<RoutingContext, ServerMessage> redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception { @@ -762,6 +780,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return null; } + @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { DuplicateIDCache cache = duplicateIDCaches.get(address); @@ -782,14 +801,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return duplicateIDCaches; } + @Override public Object getNotificationLock() { return notificationLock; } + @Override public Set<SimpleString> getAddresses() { return addressManager.getAddresses(); } + @Override public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception { // We send direct to the queue so we can send it to the same queue that is bound to the notifications address - // this is crucial for ensuring @@ -946,6 +968,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } + @Override public void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception { @@ -1047,10 +1070,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // This will use the same thread if there are no pending operations // avoiding a context switch on this case storageManager.afterCompleteOperations(new IOCallback() { + @Override public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.ioErrorAddingReferences(errorCode, errorMessage); } + @Override public void done() { addReferences(refs, direct); } @@ -1106,9 +1131,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding storageManager.afterCompleteOperations(new IOCallback() { + @Override public void onError(int errorCode, String errorMessage) { } + @Override public void done() { for (Queue queue : queues) { // in case of paging, we need to kick asynchronous delivery to try delivering @@ -1211,6 +1238,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding /** * The expiry scanner can't be started until the whole server has been started other wise you may get races */ + @Override public synchronized void startExpiryScanner() { if (reaperPeriod > 0) { if (reaperRunnable != null) @@ -1247,6 +1275,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding latch.countDown(); } + @Override public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts @@ -1293,6 +1322,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding this.refs = refs; } + @Override public void afterCommit(final Transaction tx) { for (MessageReference ref : refs) { if (!ref.isAlreadyAcked()) { @@ -1301,6 +1331,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public void afterPrepare(final Transaction tx) { for (MessageReference ref : refs) { if (ref.isAlreadyAcked()) { @@ -1310,15 +1341,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public void afterRollback(final Transaction tx) { } + @Override public void beforeCommit(final Transaction tx) throws Exception { } + @Override public void beforePrepare(final Transaction tx) throws Exception { } + @Override public void beforeRollback(final Transaction tx) throws Exception { // Reverse the ref counts, and paging sizes @@ -1333,6 +1368,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public List<MessageReference> getRelatedMessageReferences() { return refs; } @@ -1343,6 +1379,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public Bindings createBindings(final SimpleString address) throws Exception { GroupingHandler groupingHandler = server.getGroupingHandler(); BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index c401825..77c36f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -57,6 +57,7 @@ public class SimpleAddressManager implements AddressManager { this.bindingsFactory = bindingsFactory; } + @Override public boolean addBinding(final Binding binding) throws Exception { if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null || pendingDeletes.contains(binding.getUniqueName())) { throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(binding); @@ -69,6 +70,7 @@ public class SimpleAddressManager implements AddressManager { return addMappingInternal(binding.getAddress(), binding); } + @Override public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { final Binding binding = nameMap.remove(uniqueName); @@ -99,18 +101,22 @@ public class SimpleAddressManager implements AddressManager { return binding; } + @Override public Bindings getBindingsForRoutingAddress(final SimpleString address) throws Exception { return mappings.get(address); } + @Override public Binding getBinding(final SimpleString bindableName) { return nameMap.get(bindableName); } + @Override public Map<SimpleString, Binding> getBindings() { return nameMap; } + @Override public Bindings getMatchingBindings(final SimpleString address) throws Exception { Address add = new AddressImpl(address); @@ -127,6 +133,7 @@ public class SimpleAddressManager implements AddressManager { return bindings; } + @Override public void clear() { nameMap.clear(); mappings.clear(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 904a8bc..c8d7838 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -185,6 +185,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { return channel; } + @Override public void handlePacket(final Packet packet) { byte type = packet.getType(); @@ -533,6 +534,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } storageManager.afterCompleteOperations(new IOCallback() { + @Override public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); @@ -546,6 +548,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } + @Override public void done() { doConfirmAndResponse(confirmPacket, response, flush, closeChannel); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 665ea66..b60804e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -66,6 +66,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { this.connection = connection; } + @Override public void handlePacket(final Packet packet) { byte type = packet.getType(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 6295ed6..270f7dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -113,6 +113,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { return null; } + @Override public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { final Configuration config = server.getConfiguration(); @@ -153,10 +154,12 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { sessionHandlers.put(name, handler); } + @Override public void removeHandler(final String name) { sessionHandlers.remove(name); } + @Override public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { } @@ -213,6 +216,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { this.rc = rc; } + @Override public void handlePacket(final Packet packet) { if (packet.getType() == PacketImpl.PING) { Ping ping = (Ping) packet; @@ -243,6 +247,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { // may come from a channel itself // What could cause deadlocks entry.connectionExecutor.execute(new Runnable() { + @Override public void run() { if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V3)) { channel0.send(new ClusterTopologyChangeMessage_V3(topologyMember.getUniqueEventID(), nodeID, topologyMember.getBackupGroupName(), topologyMember.getScaleDownGroupName(), connectorPair, last)); @@ -270,6 +275,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { // What could cause deadlocks try { entry.connectionExecutor.execute(new Runnable() { + @Override public void run() { if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) { channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID)); @@ -296,6 +302,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { acceptorUsed.getClusterConnection().addClusterTopologyListener(listener); rc.addCloseListener(new CloseListener() { + @Override public void connectionClosed() { acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener); } @@ -305,6 +312,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { // if not clustered, we send a single notification to the client containing the node-id where the server is connected to // This is done this way so Recovery discovery could also use the node-id for non-clustered setups entry.connectionExecutor.execute(new Runnable() { + @Override public void run() { String nodeId = server.getNodeID().toString(); Pair<TransportConfiguration, TransportConfiguration> emptyConfig = new Pair<TransportConfiguration, TransportConfiguration>(null, null); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java index bc656f4..2a07606 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java @@ -39,6 +39,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I * @param outgoingInterceptors * @return */ + @Override public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index ac41fc0..2fe808f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -46,6 +46,7 @@ public final class CoreSessionCallback implements SessionCallback { this.channel = channel; } + @Override public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); @@ -56,6 +57,7 @@ public final class CoreSessionCallback implements SessionCallback { return size; } + @Override public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, @@ -67,6 +69,7 @@ public final class CoreSessionCallback implements SessionCallback { return packet.getPacketSize(); } + @Override public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount); @@ -79,6 +82,7 @@ public final class CoreSessionCallback implements SessionCallback { return size; } + @Override public void sendProducerCreditsMessage(int credits, SimpleString address) { Packet packet = new SessionProducerCreditsMessage(credits, address); @@ -92,14 +96,17 @@ public final class CoreSessionCallback implements SessionCallback { channel.send(packet); } + @Override public void closed() { protocolManager.removeHandler(name); } + @Override public void addReadyListener(final ReadyListener listener) { channel.getConnection().getTransportConnection().addReadyListener(listener); } + @Override public void removeReadyListener(final ReadyListener listener) { channel.getConnection().getTransportConnection().removeReadyListener(listener); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/JndiBindingRegistry.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/JndiBindingRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/JndiBindingRegistry.java index f81c4bd..d22fda8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/JndiBindingRegistry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/JndiBindingRegistry.java @@ -35,6 +35,7 @@ public class JndiBindingRegistry implements BindingRegistry { this.context = new InitialContext(); } + @Override public Object lookup(String name) { try { if (context == null) { @@ -49,6 +50,7 @@ public class JndiBindingRegistry implements BindingRegistry { } } + @Override public boolean bind(String name, Object obj) { try { return bindToJndi(name, obj); @@ -58,6 +60,7 @@ public class JndiBindingRegistry implements BindingRegistry { } } + @Override public void unbind(String name) { try { if (context != null) { @@ -68,6 +71,7 @@ public class JndiBindingRegistry implements BindingRegistry { } } + @Override public void close() { try { if (context != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/MapBindingRegistry.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/MapBindingRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/MapBindingRegistry.java index d06d267..2d1be92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/MapBindingRegistry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/registry/MapBindingRegistry.java @@ -25,18 +25,22 @@ public class MapBindingRegistry implements BindingRegistry { protected ConcurrentMap<String, Object> registry = new ConcurrentHashMap<String, Object>(); + @Override public Object lookup(String name) { return registry.get(name); } + @Override public boolean bind(String name, Object obj) { return registry.putIfAbsent(name, obj) == null; } + @Override public void unbind(String name) { registry.remove(name); } + @Override public void close() { } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java index f6a00be..3c0818c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java @@ -92,14 +92,17 @@ public final class InVMAcceptor implements Acceptor { connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration); } + @Override public String getName() { return name; } + @Override public Map<String, Object> getConfiguration() { return configuration; } + @Override public ClusterConnection getClusterConnection() { return clusterConnection; } @@ -112,6 +115,7 @@ public final class InVMAcceptor implements Acceptor { return connections.size(); } + @Override public synchronized void start() throws Exception { if (started) { return; @@ -132,6 +136,7 @@ public final class InVMAcceptor implements Acceptor { paused = false; } + @Override public synchronized void stop() { if (!started) { return; @@ -166,6 +171,7 @@ public final class InVMAcceptor implements Acceptor { paused = false; } + @Override public synchronized boolean isStarted() { return started; } @@ -173,6 +179,7 @@ public final class InVMAcceptor implements Acceptor { /* * Stop accepting new connections */ + @Override public synchronized void pause() { if (!started || paused) { return; @@ -183,6 +190,7 @@ public final class InVMAcceptor implements Acceptor { paused = true; } + @Override public synchronized void setNotificationService(final NotificationService notificationService) { this.notificationService = notificationService; } @@ -231,10 +239,12 @@ public final class InVMAcceptor implements Acceptor { * * @return true */ + @Override public boolean isUnsecurable() { return true; } + @Override public void setDefaultActiveMQPrincipal(ActiveMQPrincipal defaultActiveMQPrincipal) { this.defaultActiveMQPrincipal = defaultActiveMQPrincipal; } @@ -248,6 +258,7 @@ public final class InVMAcceptor implements Acceptor { this.connector = connector; } + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { @@ -258,6 +269,7 @@ public final class InVMAcceptor implements Acceptor { listener.connectionCreated(component, connection, protocol); } + @Override public void connectionDestroyed(final Object connectionID) { InVMConnection connection = (InVMConnection) connections.remove(connectionID); @@ -267,6 +279,7 @@ public final class InVMAcceptor implements Acceptor { // Execute on different thread after all the packets are sent, to avoid deadlocks connection.getExecutor().execute(new Runnable() { + @Override public void run() { // Remove on the other side too connector.disconnect((String) connectionID); @@ -275,10 +288,12 @@ public final class InVMAcceptor implements Acceptor { } } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { listener.connectionException(connectionID, me); } + @Override public void connectionReadyForWrites(Object connectionID, boolean ready) { } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java index 49de7e9..e28ee3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener public class InVMAcceptorFactory implements AcceptorFactory { + @Override public Acceptor createAcceptor(final String name, final ClusterConnection clusterConnection, final Map<String, Object> configuration, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 59c319a..f498af0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -96,18 +96,22 @@ public class InVMConnection implements Connection { this.defaultActiveMQPrincipal = defaultActiveMQPrincipal; } + @Override public void forceClose() { // no op } + @Override public RemotingConnection getProtocolConnection() { return this.protocolConnection; } + @Override public void setProtocolConnection(RemotingConnection connection) { this.protocolConnection = connection; } + @Override public void close() { if (closing) { return; @@ -124,25 +128,31 @@ public class InVMConnection implements Connection { } } + @Override public ActiveMQBuffer createTransportBuffer(final int size) { return ActiveMQBuffers.dynamicBuffer(size); } + @Override public Object getID() { return id; } + @Override public void checkFlushBatchBuffer() { } + @Override public void write(final ActiveMQBuffer buffer) { write(buffer, false, false, null); } + @Override public void write(final ActiveMQBuffer buffer, final boolean flush, final boolean batch) { write(buffer, flush, batch, null); } + @Override public void write(final ActiveMQBuffer buffer, final boolean flush, final boolean batch, @@ -153,6 +163,7 @@ public class InVMConnection implements Connection { try { executor.execute(new Runnable() { + @Override public void run() { try { if (!closed) { @@ -183,6 +194,7 @@ public class InVMConnection implements Connection { if (flush && flushEnabled) { final CountDownLatch latch = new CountDownLatch(1); executor.execute(new Runnable() { + @Override public void run() { latch.countDown(); } @@ -204,10 +216,12 @@ public class InVMConnection implements Connection { } + @Override public String getRemoteAddress() { return "invm:" + serverID; } + @Override public String getLocalAddress() { return "invm:" + serverID; } @@ -216,9 +230,11 @@ public class InVMConnection implements Connection { return -1; } + @Override public void addReadyListener(ReadyListener listener) { } + @Override public void removeReadyListener(ReadyListener listener) { } @@ -227,6 +243,7 @@ public class InVMConnection implements Connection { return false; } + @Override public ActiveMQPrincipal getDefaultActiveMQPrincipal() { return defaultActiveMQPrincipal; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index a6b819a..96f6667 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -112,6 +112,7 @@ public class InVMConnector extends AbstractConnector { return acceptor; } + @Override public synchronized void close() { if (!started) { return; @@ -124,10 +125,12 @@ public class InVMConnector extends AbstractConnector { started = false; } + @Override public boolean isStarted() { return started; } + @Override public Connection createConnection() { if (InVMConnector.failOnCreateConnection) { InVMConnector.incFailures(); @@ -155,6 +158,7 @@ public class InVMConnector extends AbstractConnector { } } + @Override public synchronized void start() { started = true; } @@ -185,6 +189,7 @@ public class InVMConnector extends AbstractConnector { return inVMConnection; } + @Override public boolean isEquivalent(Map<String, Object> configuration) { int serverId = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration); return id == serverId; @@ -192,6 +197,7 @@ public class InVMConnector extends AbstractConnector { private class Listener implements ConnectionLifeCycleListener { + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { @@ -202,6 +208,7 @@ public class InVMConnector extends AbstractConnector { listener.connectionCreated(component, connection, protocol); } + @Override public void connectionDestroyed(final Object connectionID) { if (connections.remove(connectionID) != null) { // Close the corresponding connection on the other side @@ -209,6 +216,7 @@ public class InVMConnector extends AbstractConnector { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { + @Override public void run() { listener.connectionDestroyed(connectionID); } @@ -216,15 +224,18 @@ public class InVMConnector extends AbstractConnector { } } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { + @Override public void run() { listener.connectionException(connectionID, me); } }); } + @Override public void connectionReadyForWrites(Object connectionID, boolean ready) { } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java index c22e2a6..77ca86b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; public class InVMConnectorFactory implements ConnectorFactory { + @Override public Connector createConnector(final Map<String, Object> configuration, final BufferHandler handler, final ConnectionLifeCycleListener listener, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpAcceptorHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpAcceptorHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpAcceptorHandler.java index 6e41505..bc5df25 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpAcceptorHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpAcceptorHandler.java @@ -135,6 +135,7 @@ public class HttpAcceptorHandler extends ChannelDuplexHandler { promise = channel.newPromise(); } + @Override public void run() { ResponseHolder responseHolder = null; do {