http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpKeepAliveRunnable.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpKeepAliveRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpKeepAliveRunnable.java index 712aa0e..651e395 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpKeepAliveRunnable.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/HttpKeepAliveRunnable.java @@ -31,6 +31,7 @@ public class HttpKeepAliveRunnable implements Runnable { private Future<?> future; + @Override public synchronized void run() { if (closed) { return;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 9905ca1..94cd7aa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -244,6 +244,7 @@ public class NettyAcceptor implements Acceptor { connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration); } + @Override public synchronized void start() throws Exception { if (channelClazz != null) { // Already started @@ -411,6 +412,7 @@ public class NettyAcceptor implements Acceptor { } } + @Override public String getName() { return name; } @@ -440,10 +442,12 @@ public class NettyAcceptor implements Acceptor { } } + @Override public Map<String, Object> getConfiguration() { return this.configuration; } + @Override public synchronized void stop() { if (channelClazz == null) { return; @@ -514,10 +518,12 @@ public class NettyAcceptor implements Acceptor { paused = false; } + @Override public boolean isStarted() { return channelClazz != null; } + @Override public synchronized void pause() { if (paused) { return; @@ -544,6 +550,7 @@ public class NettyAcceptor implements Acceptor { paused = true; } + @Override public void setNotificationService(final NotificationService notificationService) { this.notificationService = notificationService; } @@ -553,6 +560,7 @@ public class NettyAcceptor implements Acceptor { * * @param defaultActiveMQPrincipal */ + @Override public void setDefaultActiveMQPrincipal(ActiveMQPrincipal defaultActiveMQPrincipal) { throw new IllegalStateException("unsecure connections not allowed"); } @@ -562,6 +570,7 @@ public class NettyAcceptor implements Acceptor { * * @return */ + @Override public boolean isUnsecurable() { return false; } @@ -599,6 +608,7 @@ public class NettyAcceptor implements Acceptor { super(group, handler, listener); } + @Override public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception { @@ -613,6 +623,7 @@ public class NettyAcceptor implements Acceptor { SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); if (sslHandler != null) { sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>() { + @Override public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception { if (future.isSuccess()) { active = true; @@ -639,6 +650,7 @@ public class NettyAcceptor implements Acceptor { private class Listener implements ConnectionLifeCycleListener { + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { @@ -649,12 +661,14 @@ public class NettyAcceptor implements Acceptor { listener.connectionCreated(component, connection, protocol); } + @Override public void connectionDestroyed(final Object connectionID) { if (connections.remove(connectionID) != null) { listener.connectionDestroyed(connectionID); } } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { // Execute on different thread to avoid deadlocks new Thread() { @@ -666,6 +680,7 @@ public class NettyAcceptor implements Acceptor { } + @Override public void connectionReadyForWrites(final Object connectionID, boolean ready) { NettyServerConnection conn = connections.get(connectionID); @@ -679,6 +694,7 @@ public class NettyAcceptor implements Acceptor { private boolean cancelled; + @Override public synchronized void run() { if (!cancelled) { for (Connection connection : connections.values()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java index 92b79f8..880522f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener public class NettyAcceptorFactory implements AcceptorFactory { + @Override public Acceptor createAcceptor(final String name, final ClusterConnection connection, final Map<String, Object> configuration, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index b2bca01..30f6ac4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -183,6 +183,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors(configuration.getOutgoingInterceptorClassNames())); } + @Override public synchronized void start() throws Exception { if (started) { return; @@ -278,6 +279,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle started = true; } + @Override public synchronized void startAcceptors() throws Exception { if (isStarted()) { for (Acceptor a : acceptors.values()) { @@ -286,6 +288,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public synchronized void allowInvmSecurityOverride(ActiveMQPrincipal principal) { defaultInvmSecurityPrincipal = principal; for (Acceptor acceptor : acceptors.values()) { @@ -295,6 +298,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public synchronized void pauseAcceptors() { if (!started) return; @@ -309,6 +313,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) { if (!started) return; @@ -334,6 +339,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public void stop(final boolean criticalError) throws Exception { if (!started) { return; @@ -410,6 +416,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return acceptors.get(name); } + @Override public boolean isStarted() { return started; } @@ -427,6 +434,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); @@ -442,6 +450,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public synchronized Set<RemotingConnection> getConnections() { Set<RemotingConnection> conns = new HashSet<RemotingConnection>(connections.size()); @@ -452,6 +461,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return conns; } + @Override public synchronized ReusableLatch getConnectionCountLatch() { return connectionCountLatch; } @@ -462,6 +472,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return protocolMap.get(protocol); } + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { @@ -485,6 +496,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle connectionCountLatch.countUp(); } + @Override public void connectionDestroyed(final Object connectionID) { if (isTrace) { @@ -519,6 +531,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { // We DO NOT call fail on connection exception, otherwise in event of real connection failure, the // connection will be failed, the session will be closed and won't be able to reconnect @@ -530,6 +543,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle // Connections should only fail when TTL is exceeded } + @Override public void connectionReadyForWrites(final Object connectionID, final boolean ready) { } @@ -598,6 +612,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private final class DelegatingBufferHandler implements BufferHandler { + @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { ConnectionEntry conn = connections.get(connectionID); @@ -668,6 +683,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle if (flush) { flushExecutor.execute(new Runnable() { + @Override public void run() { try { // this is using a different thread http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index ab33e19..036be12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -72,6 +72,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean) */ + @Override public void appendAddRecord(final long id, final byte recordType, final byte[] record, @@ -79,6 +80,7 @@ public class ReplicatedJournal implements Journal { this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync); } + @Override public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, @@ -98,6 +100,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendAddRecord(long, byte, org.apache.activemq.artemis.core.journal.EncodingSupport, boolean) */ + @Override public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, @@ -118,6 +121,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[]) */ + @Override public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, @@ -133,6 +137,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.apache.activemq.artemis.core.journal.EncodingSupport) */ + @Override public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, @@ -150,6 +155,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendCommitRecord(long, boolean) */ + @Override public void appendCommitRecord(final long txID, final boolean sync) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendCommit " + txID); @@ -158,6 +164,7 @@ public class ReplicatedJournal implements Journal { localJournal.appendCommitRecord(txID, sync); } + @Override public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendCommit " + txID); @@ -166,6 +173,7 @@ public class ReplicatedJournal implements Journal { localJournal.appendCommitRecord(txID, sync, callback); } + @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, @@ -210,6 +218,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[]) */ + @Override public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception { this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record)); } @@ -221,6 +230,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.apache.activemq.artemis.core.journal.EncodingSupport) */ + @Override public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception { @@ -237,6 +247,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecordTransactional(long, long) */ + @Override public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendDelete (noencoding) txID=" + txID + " id=" + id); @@ -252,6 +263,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendPrepareRecord(long, byte[], boolean) */ + @Override public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception { this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync); } @@ -263,6 +275,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendPrepareRecord(long, org.apache.activemq.artemis.core.journal.EncodingSupport, boolean) */ + @Override public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception { @@ -291,6 +304,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendRollbackRecord(long, boolean) */ + @Override public void appendRollbackRecord(final long txID, final boolean sync) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendRollback " + txID); @@ -299,6 +313,7 @@ public class ReplicatedJournal implements Journal { localJournal.appendRollbackRecord(txID, sync); } + @Override public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendRollback " + txID); @@ -315,6 +330,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean) */ + @Override public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, @@ -363,6 +379,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[]) */ + @Override public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, @@ -378,6 +395,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.apache.activemq.artemis.core.journal.EncodingSupport) */ + @Override public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, @@ -396,6 +414,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#load(java.util.List, java.util.List, org.apache.activemq.artemis.core.journal.TransactionFailureCallback) */ + @Override public JournalLoadInformation load(final List<RecordInfo> committedRecords, final List<PreparedTransactionInfo> preparedTransactions, final TransactionFailureCallback transactionFailure) throws Exception { @@ -407,6 +426,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.journal.Journal#load(org.apache.activemq.artemis.core.journal.LoaderCallback) */ + @Override public JournalLoadInformation load(final LoaderCallback reloadManager) throws Exception { return localJournal.load(reloadManager); } @@ -415,6 +435,7 @@ public class ReplicatedJournal implements Journal { * @param pages * @see org.apache.activemq.artemis.core.journal.Journal#perfBlast(int) */ + @Override public void perfBlast(final int pages) { localJournal.perfBlast(pages); } @@ -423,6 +444,7 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.server.ActiveMQComponent#start() */ + @Override public void start() throws Exception { localJournal.start(); } @@ -431,14 +453,17 @@ public class ReplicatedJournal implements Journal { * @throws Exception * @see org.apache.activemq.artemis.core.server.ActiveMQComponent#stop() */ + @Override public void stop() throws Exception { localJournal.stop(); } + @Override public int getAlignment() throws Exception { return localJournal.getAlignment(); } + @Override public boolean isStarted() { return localJournal.isStarted(); } @@ -448,18 +473,22 @@ public class ReplicatedJournal implements Journal { return localJournal.loadInternalOnly(); } + @Override public int getNumberOfRecords() { return localJournal.getNumberOfRecords(); } + @Override public void runDirectJournalBlast() throws Exception { localJournal.runDirectJournalBlast(); } + @Override public int getUserVersion() { return localJournal.getUserVersion(); } + @Override public void lineUpContext(IOCompletion callback) { ((OperationContext) callback).replicationLineUp(); localJournal.lineUpContext(callback); @@ -500,6 +529,7 @@ public class ReplicatedJournal implements Journal { throw new UnsupportedOperationException(); } + @Override public int getFileSize() { return localJournal.getFileSize(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index c79e572..2caaa8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -239,10 +239,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon activation.remoteFailOver(packet.isFinalMessage()); } + @Override public boolean isStarted() { return started; } + @Override public synchronized void start() throws Exception { Configuration config = server.getConfiguration(); try { @@ -272,6 +274,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } } + @Override public synchronized void stop() throws Exception { if (!started) { return; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d276474..4759cfa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -255,6 +255,7 @@ public final class ReplicationManager implements ActiveMQComponent { enabled = true; } + @Override public synchronized void stop() throws Exception { if (!started) { return; @@ -390,12 +391,14 @@ public final class ReplicationManager implements ActiveMQComponent { connectionFailed(me, failedOver); } + @Override public void beforeReconnect(final ActiveMQException me) { } } private final class ResponseHandler implements ChannelHandler { + @Override public void handlePacket(final Packet packet) { if (packet.getType() == PacketImpl.REPLICATION_RESPONSE || packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) { replicated(); @@ -414,12 +417,15 @@ public final class ReplicationManager implements ActiveMQComponent { static final NullEncoding instance = new NullEncoding(); + @Override public void decode(final ActiveMQBuffer buffer) { } + @Override public void encode(final ActiveMQBuffer buffer) { } + @Override public int getEncodeSize() { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java index 3a20952..514ec8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java @@ -98,10 +98,12 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC return securityEnabled; } + @Override public void stop() { securityRepository.unRegisterListener(this); } + @Override public void authenticate(final String user, final String password, X509Certificate[] certificates) throws Exception { if (securityEnabled) { @@ -145,6 +147,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC } } + @Override public void check(final SimpleString address, final CheckType checkType, final SecurityAuth session) throws Exception { @@ -203,6 +206,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC } } + @Override public void onChange() { invalidateCache(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index e6e1ee9..2a16ed2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage { + @Override void addBytes(byte[] bytes) throws Exception; void setPendingRecordID(long pendingRecordID); @@ -37,8 +38,10 @@ public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessag /** * Close the files if opened */ + @Override void releaseResources(); + @Override void deleteFile() throws Exception; void incrementDelayDeletionCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MemoryManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MemoryManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MemoryManager.java index 7eed499..9b96c9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MemoryManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MemoryManager.java @@ -50,10 +50,12 @@ public class MemoryManager implements ActiveMQComponent { return low; } + @Override public synchronized boolean isStarted() { return started; } + @Override public synchronized void start() { ActiveMQServerLogger.LOGGER.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval + " FREE_MEMORY_PERCENT: " + @@ -73,6 +75,7 @@ public class MemoryManager implements ActiveMQComponent { thread.start(); } + @Override public synchronized void stop() { if (!started) { // Already stopped @@ -92,6 +95,7 @@ public class MemoryManager implements ActiveMQComponent { private class MemoryRunnable implements Runnable { + @Override public void run() { while (true) { try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java index 73613bd..331851d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java @@ -65,10 +65,12 @@ public abstract class NodeManager implements ActiveMQComponent { // -------------------------------------------------------------------- + @Override public synchronized void start() throws Exception { isStarted = true; } + @Override public boolean isStarted() { return isStarted; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index ab6dd0d..6026887 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -36,6 +36,7 @@ public interface ServerSession extends SecurityAuth { Object getConnectionID(); + @Override RemotingConnection getRemotingConnection(); boolean removeConsumer(long consumerID) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index bc4a0ee..81a3184 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -72,6 +72,7 @@ public class BackupManager implements ActiveMQComponent { * Start the backup manager if not already started. This entails deploying a backup connector based on a cluster * configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster. * */ + @Override public synchronized void start() { if (started) return; @@ -94,6 +95,7 @@ public class BackupManager implements ActiveMQComponent { /* * stop all the connectors * */ + @Override public synchronized void stop() { if (!started) return; @@ -218,6 +220,7 @@ public class BackupManager implements ActiveMQComponent { //this has to be done in a separate thread executor.execute(new Runnable() { + @Override public void run() { if (stopping) return; @@ -255,6 +258,7 @@ public class BackupManager implements ActiveMQComponent { ActiveMQServerLogger.LOGGER.errorAnnouncingBackup(); scheduledExecutor.schedule(new Runnable() { + @Override public void run() { announceBackup(); } @@ -288,6 +292,7 @@ public class BackupManager implements ActiveMQComponent { closeLocator(backupServerLocator); } executor.execute(new Runnable() { + @Override public void run() { synchronized (BackupConnector.this) { closeLocator(backupServerLocator); @@ -325,6 +330,7 @@ public class BackupManager implements ActiveMQComponent { this.tcConfigs = tcConfigs; } + @Override public ServerLocatorInternal createServerLocator(Topology topology) { if (tcConfigs != null && tcConfigs.length > 0) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { @@ -361,6 +367,7 @@ public class BackupManager implements ActiveMQComponent { this.discoveryGroupConfiguration = discoveryGroupConfiguration; } + @Override public ServerLocatorInternal createServerLocator(Topology topology) { return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java index 65d19d6..c6a9b73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java @@ -48,6 +48,7 @@ public interface Bridge extends Consumer, ActiveMQComponent { * To be called when the server sent a disconnect to the client. * Basically this is for cluster bridges being disconnected */ + @Override void disconnect(); boolean isConnected(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java index c213ff9..0956bc5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java @@ -152,6 +152,7 @@ public class ClusterControl implements AutoCloseable { /** * close this cluster control and its resources */ + @Override public void close() { sessionFactory.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index e23e18a..1a6271d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -252,6 +252,7 @@ public final class ClusterManager implements ActiveMQComponent { } } + @Override public synchronized void start() throws Exception { if (state == State.STARTED) { return; @@ -298,6 +299,7 @@ public final class ClusterManager implements ActiveMQComponent { } } + @Override public void stop() throws Exception { haManager.stop(); synchronized (this) { @@ -351,6 +353,7 @@ public final class ClusterManager implements ActiveMQComponent { } } + @Override public boolean isStarted() { return state == State.STARTED; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java index d35bccb..0959f72 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java @@ -50,6 +50,7 @@ public class ColocatedHAManager implements HAManager { /** * starts the HA manager. */ + @Override public void start() { if (started) return; @@ -62,6 +63,7 @@ public class ColocatedHAManager implements HAManager { /** * stop any backups */ + @Override public void stop() { for (ActiveMQServer activeMQServer : backupServers.values()) { try { @@ -103,6 +105,7 @@ public class ColocatedHAManager implements HAManager { * * @return the backups */ + @Override public Map<String, ActiveMQServer> getBackupServers() { return backupServers; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index c32b446..d7f15b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -99,6 +99,7 @@ public class ReplicaPolicy extends BackupPolicy { /* * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done * */ + @Override public String getBackupGroupName() { return groupName; } @@ -111,10 +112,12 @@ public class ReplicaPolicy extends BackupPolicy { this.groupName = groupName; } + @Override public boolean isRestartBackup() { return restartBackup; } + @Override public void setRestartBackup(boolean restartBackup) { this.restartBackup = restartBackup; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index 295a862..e627eb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -123,6 +123,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> { /* * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done * */ + @Override public String getBackupGroupName() { return groupName; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 22f58a5..2ddcaeb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -237,10 +237,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled factory.cleanup(); } + @Override public void setNotificationService(final NotificationService notificationService) { this.notificationService = notificationService; } + @Override public synchronized void start() throws Exception { if (started) { return; @@ -260,6 +262,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public String debug() { return toString(); } @@ -297,6 +300,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public void flushExecutor() { // Wait for any create objects runnable to complete FutureLatch future = new FutureLatch(); @@ -310,8 +314,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public void disconnect() { executor.execute(new Runnable() { + @Override public void run() { if (session != null) { try { @@ -326,6 +332,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled }); } + @Override public boolean isConnected() { return session != null; } @@ -338,6 +345,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return executor; } + @Override public void stop() throws Exception { if (stopping) { return; @@ -368,6 +376,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public void pause() throws Exception { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being paused"); @@ -388,11 +397,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public void resume() throws Exception { queue.addConsumer(BridgeImpl.this); queue.deliverAsync(); } + @Override public boolean isStarted() { return started; } @@ -401,25 +412,30 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled executor.execute(new ConnectRunnable(this)); } + @Override public SimpleString getName() { return name; } + @Override public Queue getQueue() { return queue; } + @Override public Filter getFilter() { return filter; } // SendAcknowledgementHandler implementation --------------------- + @Override public SimpleString getForwardingAddress() { return forwardingAddress; } // For testing only + @Override public RemotingConnection getForwardingConnection() { if (session == null) { return null; @@ -431,6 +447,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // Consumer implementation --------------------------------------- + @Override public void sendAcknowledged(final Message message) { if (active) { try { @@ -480,6 +497,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (filter != null && !filter.match(ref.getMessage())) { return HandleStatus.NO_MATCH; @@ -562,14 +580,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // FailureListener implementation -------------------------------- + @Override public void proceedDeliver(MessageReference ref) { // no op } + @Override public void connectionFailed(final ActiveMQException me, boolean failedOver) { connectionFailed(me, failedOver, null); } + @Override public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver); @@ -625,6 +646,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled scheduleRetryConnect(); } + @Override public void beforeReconnect(final ActiveMQException exception) { // log.warn(name + "::Connection failed before reconnect ", exception); // fail(false); @@ -634,6 +656,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled final MessageReference ref, final LargeServerMessage message) { executor.execute(new Runnable() { + @Override public void run() { try { producer.send(dest, message); @@ -994,6 +1017,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.bridge = bridge; } + @Override public void run() { if (bridge.isStarted()) executor.execute(new ConnectRunnable(bridge)); @@ -1008,6 +1032,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled bridge = bridge2; } + @Override public void run() { bridge.connect(); } @@ -1015,6 +1040,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private class StopRunnable implements Runnable { + @Override public void run() { try { ActiveMQServerLogger.LOGGER.debug("stopping bridge " + BridgeImpl.this); @@ -1070,6 +1096,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private class PauseRunnable implements Runnable { + @Override public void run() { try { queue.removeConsumer(BridgeImpl.this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java index d69355c..7a5dbc4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java @@ -88,10 +88,12 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { uniqueID = UUIDGenerator.getInstance().generateStringUUID(); } + @Override public void setNotificationService(final NotificationService notificationService) { this.notificationService = notificationService; } + @Override public synchronized void start() throws Exception { if (started) { return; @@ -111,6 +113,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { activate(); } + @Override public synchronized void stop() { if (!started) { return; @@ -143,22 +146,27 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { } + @Override public synchronized boolean isStarted() { return started; } + @Override public String getName() { return name; } + @Override public synchronized void addConnector(final TransportConfiguration tcConfig) { connectors.add(tcConfig); } + @Override public synchronized void removeConnector(final TransportConfiguration tcConfig) { connectors.remove(tcConfig); } + @Override public synchronized int size() { return connectors.size(); } @@ -169,6 +177,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { } } + @Override public synchronized void broadcastConnectors() throws Exception { ActiveMQBuffer buff = ActiveMQBuffers.dynamicBuffer(4096); @@ -187,6 +196,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { endpoint.broadcast(data); } + @Override public void run() { if (!started) { return; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 4db67fc..3963b02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -359,6 +359,7 @@ public class ClusterConnectionBridge extends BridgeImpl { } } + @Override protected boolean isPlainCoreBridge() { return false; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index d290c6e..12ca5e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -366,6 +366,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.manager = manager; } + @Override public void start() throws Exception { synchronized (this) { if (started) { @@ -379,6 +380,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } + @Override public void flushExecutor() { FutureLatch future = new FutureLatch(); executor.execute(future); @@ -388,6 +390,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } } + @Override public void stop() throws Exception { if (!started) { return; @@ -424,6 +427,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn managementService.sendNotification(notification); } executor.execute(new Runnable() { + @Override public void run() { synchronized (ClusterConnectionImpl.this) { closeLocator(serverLocator); @@ -448,18 +452,22 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return topology.getMember(manager.getNodeId()); } + @Override public void addClusterTopologyListener(final ClusterTopologyListener listener) { topology.addClusterTopologyListener(listener); } + @Override public void removeClusterTopologyListener(final ClusterTopologyListener listener) { topology.removeClusterTopologyListener(listener); } + @Override public Topology getTopology() { return topology; } + @Override public void nodeAnnounced(final long uniqueEventID, final String nodeID, final String backupGroupName, @@ -507,22 +515,27 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn // localMember.getConnector().b); } + @Override public boolean isStarted() { return started; } + @Override public SimpleString getName() { return name; } + @Override public String getNodeID() { return nodeManager.getNodeId().toString(); } + @Override public ActiveMQServer getServer() { return server; } + @Override public boolean isNodeActive(String nodeId) { MessageFlowRecord rec = records.get(nodeId); if (rec == null) { @@ -531,6 +544,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return rec.getBridge().isConnected(); } + @Override public Map<String, String> getNodes() { synchronized (recordsGuard) { Map<String, String> nodes = new HashMap<String, String>(); @@ -611,12 +625,14 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn addClusterTopologyListener(this); } + @Override public TransportConfiguration getConnector() { return connector; } // ClusterTopologyListener implementation ------------------------------------------------------------------ + @Override public void nodeDown(final long eventUID, final String nodeID) { /* * we dont do anything when a node down is received. The bridges will take care themselves when they should disconnect @@ -709,6 +725,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } } + @Override public synchronized void informClusterOfBackup() { String nodeID = server.getNodeID().toString(); @@ -850,10 +867,12 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn "]"; } + @Override public void serverDisconnected() { this.disconnected = true; } + @Override public String getAddress() { return address.toString(); } @@ -893,6 +912,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return queue; } + @Override public int getMaxHops() { return maxHops; } @@ -901,6 +921,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn * we should only ever close a record when the node itself has gone down or in the case of scale down where we know * the node is being completely destroyed and in this case we will migrate to another server/Bridge. * */ + @Override public void close() throws Exception { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("Stopping bridge " + bridge); @@ -917,6 +938,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn bridge.stop(); bridge.getExecutor().execute(new Runnable() { + @Override public void run() { try { if (disconnected) { @@ -933,10 +955,12 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn }); } + @Override public boolean isClosed() { return isClosed; } + @Override public void reset() throws Exception { resetBindings(); } @@ -945,10 +969,12 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.bridge = bridge; } + @Override public Bridge getBridge() { return bridge; } + @Override public synchronized void onMessage(final ClientMessage message) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("ClusterCommunication::Flow record on " + clusterConnector + " Receiving message " + message); @@ -1117,6 +1143,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } } + @Override public synchronized void disconnectBindings() throws Exception { ActiveMQServerLogger.LOGGER.debug(ClusterConnectionImpl.this + " disconnect bindings"); reset = false; @@ -1364,6 +1391,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn "]"; } + @Override public String describe() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); @@ -1393,6 +1421,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.tcConfigs = tcConfigs; } + @Override public ServerLocatorInternal createServerLocator() { if (tcConfigs != null && tcConfigs.length > 0) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { @@ -1420,6 +1449,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.dg = dg; } + @Override public ServerLocatorInternal createServerLocator() { return new ServerLocatorImpl(topology, true, dg); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index d6abf22..7d24dc6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -74,14 +74,17 @@ public class Redistributor implements Consumer { this.batchSize = batchSize; } + @Override public Filter getFilter() { return null; } + @Override public String debug() { return toString(); } + @Override public String toManagementString() { return "Redistributor[" + queue.getName() + "/" + queue.getID() + "]"; } @@ -126,6 +129,7 @@ public class Redistributor implements Consumer { } } + @Override public synchronized HandleStatus handle(final MessageReference reference) throws Exception { if (!active) { return HandleStatus.BUSY; @@ -153,6 +157,7 @@ public class Redistributor implements Consumer { else { active = false; executor.execute(new Runnable() { + @Override public void run() { try { routingInfo.getB().finishCopy(); @@ -187,6 +192,7 @@ public class Redistributor implements Consumer { return HandleStatus.HANDLED; } + @Override public void proceedDeliver(MessageReference ref) { // no op } @@ -194,6 +200,7 @@ public class Redistributor implements Consumer { private void internalExecute(final Runnable runnable) { pendingRuns.countUp(); executor.execute(new Runnable() { + @Override public void run() { try { runnable.run(); @@ -214,10 +221,12 @@ public class Redistributor implements Consumer { storageManager.afterCompleteOperations(new IOCallback() { + @Override public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.ioErrorRedistributing(errorCode, errorMessage); } + @Override public void done() { execPrompter(); } @@ -243,6 +252,7 @@ public class Redistributor implements Consumer { private class Prompter implements Runnable { + @Override public void run() { synchronized (Redistributor.this) { active = true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 0a0030f..883d36e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -91,50 +91,62 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { this.distance = distance; } + @Override public long getID() { return id; } + @Override public SimpleString getAddress() { return address; } + @Override public Bindable getBindable() { return storeAndForwardQueue; } + @Override public Queue getQueue() { return storeAndForwardQueue; } + @Override public SimpleString getRoutingName() { return routingName; } + @Override public SimpleString getUniqueName() { return uniqueName; } + @Override public SimpleString getClusterName() { return uniqueName; } + @Override public boolean isExclusive() { return false; } + @Override public BindingType getType() { return BindingType.REMOTE_QUEUE; } + @Override public Filter getFilter() { return queueFilter; } + @Override public int getDistance() { return distance; } + @Override public synchronized boolean isHighAcceptPriority(final ServerMessage message) { if (consumerCount == 0) { return false; @@ -158,6 +170,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { public void unproposed(SimpleString groupID) { } + @Override public void route(final ServerMessage message, final RoutingContext context) { addRouteContextToMessage(message); @@ -183,6 +196,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } } + @Override public synchronized void addConsumer(final SimpleString filterString) throws Exception { if (filterString != null) { // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref @@ -203,6 +217,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { consumerCount++; } + @Override public synchronized void removeConsumer(final SimpleString filterString) throws Exception { if (filterString != null) { Integer i = filterCounts.get(filterString); @@ -231,6 +246,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { filters.clear(); } + @Override public synchronized int consumerCount() { return consumerCount; } @@ -289,6 +305,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { return filters; } + @Override public void close() throws Exception { storeAndForwardQueue.close(); } @@ -324,6 +341,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } } + @Override public long getRemoteQueueID() { return remoteQueueID; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java index 553f4ce..69c8068 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java @@ -36,6 +36,7 @@ public final class BooleanVote extends Vote<Boolean> { return false; } + @Override public Boolean getVote() { return vote; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/GroupHandlingAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/GroupHandlingAbstract.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/GroupHandlingAbstract.java index c3165c6..3c32c92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/GroupHandlingAbstract.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/GroupHandlingAbstract.java @@ -51,12 +51,14 @@ public abstract class GroupHandlingAbstract implements GroupingHandler { this.address = address; } + @Override public void addListener(final UnproposalListener listener) { if (executor == null) { listeners.add(listener); } else { executor.execute(new Runnable() { + @Override public void run() { listeners.add(listener); } @@ -67,6 +69,7 @@ public abstract class GroupHandlingAbstract implements GroupingHandler { protected void fireUnproposed(final SimpleString groupID) { Runnable runnable = new Runnable() { + @Override public void run() { for (UnproposalListener listener : listeners) { listener.unproposed(groupID); @@ -82,6 +85,7 @@ public abstract class GroupHandlingAbstract implements GroupingHandler { } } + @Override public void forceRemove(SimpleString groupid, SimpleString clusterName) throws Exception { remove(groupid, clusterName); sendUnproposal(groupid, clusterName, 0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java index 4162776..09a8fc3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java @@ -97,10 +97,12 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { this.groupTimeout = groupTimeout; } + @Override public SimpleString getName() { return name; } + @Override public Response propose(final Proposal proposal) throws Exception { OperationContext originalCtx = storageManager.getContext(); @@ -157,11 +159,13 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { } } + @Override public void resendPending() throws Exception { // this only make sense on RemoteGroupingHandler. // this is a no-op on the local one } + @Override public void proposed(final Response response) throws Exception { } @@ -170,6 +174,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { remove(groupid, clusterName); } + @Override public void sendProposalResponse(final Response response, final int distance) throws Exception { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID, response.getGroupId()); @@ -182,11 +187,13 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { managementService.sendNotification(notification); } + @Override public Response receive(final Proposal proposal, final int distance) throws Exception { ActiveMQServerLogger.LOGGER.trace("received proposal " + proposal); return propose(proposal); } + @Override public void addGroupBinding(final GroupBinding groupBinding) { map.put(groupBinding.getGroupId(), groupBinding); List<GroupBinding> newList = new ArrayList<GroupBinding>(); @@ -197,6 +204,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { newList.add(groupBinding); } + @Override public Response getProposal(final SimpleString fullID, final boolean touchTime) { GroupBinding original = map.get(fullID); @@ -271,6 +279,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { } } + @Override public void onNotification(final Notification notification) { if (!(notification.getType() instanceof CoreNotificationType)) return; @@ -315,6 +324,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { } } + @Override public synchronized void start() throws Exception { if (started) return; @@ -335,6 +345,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { started = true; } + @Override public synchronized void stop() throws Exception { started = false; if (reaperFuture != null) { @@ -343,6 +354,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { } } + @Override public boolean isStarted() { return started; } @@ -392,6 +404,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { final GroupIdReaper reaper = new GroupIdReaper(); + @Override public void run() { executor.execute(reaper); } @@ -400,6 +413,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { private final class GroupIdReaper implements Runnable { + @Override public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/RemoteGroupingHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/RemoteGroupingHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/RemoteGroupingHandler.java index 5545aff..57763d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/RemoteGroupingHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/RemoteGroupingHandler.java @@ -84,6 +84,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { this(null, managementService, name, address, timeout, groupTimeout); } + @Override public SimpleString getName() { return name; } @@ -105,6 +106,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { return started; } + @Override public void resendPending() throws Exception { // In case the RESET wasn't sent yet to the remote node, we may eventually miss a node send, // on that case the cluster-reset information will ask the group to resend any pending information @@ -121,6 +123,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { } } + @Override public Response propose(final Proposal proposal) throws Exception { // return it from the cache first Response response = responses.get(proposal.getGroupId()); @@ -205,6 +208,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { return new Notification(null, CoreNotificationType.PROPOSAL, props); } + @Override public Response getProposal(final SimpleString fullID, boolean touchTime) { Response response = responses.get(fullID); @@ -231,6 +235,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { sendUnproposal(groupid, clusterName, distance); } + @Override public void proposed(final Response response) throws Exception { try { lock.lock(); @@ -250,6 +255,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { } } + @Override public Response receive(final Proposal proposal, final int distance) throws Exception { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID, proposal.getGroupId()); @@ -262,14 +268,17 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract { return null; } + @Override public void sendProposalResponse(final Response response, final int distance) throws Exception { // NO-OP } + @Override public void addGroupBinding(final GroupBinding groupBinding) { // NO-OP } + @Override public void onNotification(final Notification notification) { if (!(notification.getType() instanceof CoreNotificationType)) return;