http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 73af6fd..0a608a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -314,6 +314,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); } AccessController.doPrivileged(new PrivilegedAction<Object>() { + @Override public Object run() { loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(connectionLoadBalancingPolicyClassName); return null; @@ -500,6 +501,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery this(topology, useHA, null, transportConfigs); } + @Override public void resetToInitialConnectors() { synchronized (topologyArrayGuard) { receivedTopology = false; @@ -511,6 +513,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery /* * I'm not using isAllInVM here otherwsie BeanProperties would translate this as a property for the URL */ + @Override public boolean allInVM() { for (TransportConfiguration config : getStaticTransportConfigurations()) { if (!config.getFactoryClassName().contains("InVMConnectorFactory")) { @@ -597,6 +600,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public void start(Executor executor) throws Exception { initialise(); @@ -604,6 +608,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (executor != null) { executor.execute(new Runnable() { + @Override public void run() { try { connect(); @@ -618,10 +623,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public ClientProtocolManager newProtocolManager() { return getProtocolManagerFactory().newProtocolManager(); } + @Override public ClientProtocolManagerFactory getProtocolManagerFactory() { if (protocolManagerFactory == null) { // Default one in case it's null @@ -630,12 +637,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return protocolManagerFactory; } + @Override public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) { this.protocolManagerFactory = protocolManagerFactory; protocolManagerFactory.setLocator(this); return this; } + @Override public void disableFinalizeCheck() { finalizeCheck = false; } @@ -670,15 +679,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return connect(true); } + @Override public ServerLocatorImpl setAfterConnectionInternalListener(AfterConnectInternalListener listener) { this.afterConnectListener = listener; return this; } + @Override public AfterConnectInternalListener getAfterConnectInternalListener() { return afterConnectListener; } + @Override public ClientSessionFactory createSessionFactory(String nodeID) throws Exception { TopologyMember topologyMember = topology.getMember(nodeID); @@ -705,6 +717,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return null; } + @Override public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception { assertOpen(); @@ -730,6 +743,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration, int reconnectAttempts, boolean failoverOnInitialConnection) throws Exception { @@ -770,6 +784,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public ClientSessionFactory createSessionFactory() throws ActiveMQException { assertOpen(); @@ -902,327 +917,393 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return fromInterceptors(outgoingInterceptors); } + @Override public boolean isCacheLargeMessagesClient() { return cacheLargeMessagesClient; } + @Override public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) { cacheLargeMessagesClient = cached; return this; } + @Override public long getClientFailureCheckPeriod() { return clientFailureCheckPeriod; } + @Override public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) { checkWrite(); this.clientFailureCheckPeriod = clientFailureCheckPeriod; return this; } + @Override public long getConnectionTTL() { return connectionTTL; } + @Override public ServerLocatorImpl setConnectionTTL(final long connectionTTL) { checkWrite(); this.connectionTTL = connectionTTL; return this; } + @Override public long getCallTimeout() { return callTimeout; } + @Override public ServerLocatorImpl setCallTimeout(final long callTimeout) { checkWrite(); this.callTimeout = callTimeout; return this; } + @Override public long getCallFailoverTimeout() { return callFailoverTimeout; } + @Override public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) { checkWrite(); this.callFailoverTimeout = callFailoverTimeout; return this; } + @Override public int getMinLargeMessageSize() { return minLargeMessageSize; } + @Override public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) { checkWrite(); this.minLargeMessageSize = minLargeMessageSize; return this; } + @Override public int getConsumerWindowSize() { return consumerWindowSize; } + @Override public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) { checkWrite(); this.consumerWindowSize = consumerWindowSize; return this; } + @Override public int getConsumerMaxRate() { return consumerMaxRate; } + @Override public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) { checkWrite(); this.consumerMaxRate = consumerMaxRate; return this; } + @Override public int getConfirmationWindowSize() { return confirmationWindowSize; } + @Override public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) { checkWrite(); this.confirmationWindowSize = confirmationWindowSize; return this; } + @Override public int getProducerWindowSize() { return producerWindowSize; } + @Override public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) { checkWrite(); this.producerWindowSize = producerWindowSize; return this; } + @Override public int getProducerMaxRate() { return producerMaxRate; } + @Override public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) { checkWrite(); this.producerMaxRate = producerMaxRate; return this; } + @Override public boolean isBlockOnAcknowledge() { return blockOnAcknowledge; } + @Override public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) { checkWrite(); this.blockOnAcknowledge = blockOnAcknowledge; return this; } + @Override public boolean isBlockOnDurableSend() { return blockOnDurableSend; } + @Override public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) { checkWrite(); this.blockOnDurableSend = blockOnDurableSend; return this; } + @Override public boolean isBlockOnNonDurableSend() { return blockOnNonDurableSend; } + @Override public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) { checkWrite(); this.blockOnNonDurableSend = blockOnNonDurableSend; return this; } + @Override public boolean isAutoGroup() { return autoGroup; } + @Override public ServerLocatorImpl setAutoGroup(final boolean autoGroup) { checkWrite(); this.autoGroup = autoGroup; return this; } + @Override public boolean isPreAcknowledge() { return preAcknowledge; } + @Override public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) { checkWrite(); this.preAcknowledge = preAcknowledge; return this; } + @Override public int getAckBatchSize() { return ackBatchSize; } + @Override public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) { checkWrite(); this.ackBatchSize = ackBatchSize; return this; } + @Override public boolean isUseGlobalPools() { return useGlobalPools; } + @Override public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) { checkWrite(); this.useGlobalPools = useGlobalPools; return this; } + @Override public int getScheduledThreadPoolMaxSize() { return scheduledThreadPoolMaxSize; } + @Override public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) { checkWrite(); this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; return this; } + @Override public int getThreadPoolMaxSize() { return threadPoolMaxSize; } + @Override public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) { checkWrite(); this.threadPoolMaxSize = threadPoolMaxSize; return this; } + @Override public long getRetryInterval() { return retryInterval; } + @Override public ServerLocatorImpl setRetryInterval(final long retryInterval) { checkWrite(); this.retryInterval = retryInterval; return this; } + @Override public long getMaxRetryInterval() { return maxRetryInterval; } + @Override public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) { checkWrite(); maxRetryInterval = retryInterval; return this; } + @Override public double getRetryIntervalMultiplier() { return retryIntervalMultiplier; } + @Override public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) { checkWrite(); this.retryIntervalMultiplier = retryIntervalMultiplier; return this; } + @Override public int getReconnectAttempts() { return reconnectAttempts; } + @Override public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) { checkWrite(); this.reconnectAttempts = reconnectAttempts; return this; } + @Override public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) { checkWrite(); this.initialConnectAttempts = initialConnectAttempts; return this; } + @Override public int getInitialConnectAttempts() { return initialConnectAttempts; } + @Override public boolean isFailoverOnInitialConnection() { return this.failoverOnInitialConnection; } + @Override public ServerLocatorImpl setFailoverOnInitialConnection(final boolean failover) { checkWrite(); this.failoverOnInitialConnection = failover; return this; } + @Override public String getConnectionLoadBalancingPolicyClassName() { return connectionLoadBalancingPolicyClassName; } + @Override public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) { checkWrite(); connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName; return this; } + @Override public TransportConfiguration[] getStaticTransportConfigurations() { if (initialConnectors == null) return new TransportConfiguration[]{}; return Arrays.copyOf(initialConnectors, initialConnectors.length); } + @Override public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() { return discoveryGroupConfiguration; } + @Override public ServerLocatorImpl addIncomingInterceptor(final Interceptor interceptor) { incomingInterceptors.add(interceptor); return this; } + @Override public ServerLocatorImpl addOutgoingInterceptor(final Interceptor interceptor) { outgoingInterceptors.add(interceptor); return this; } + @Override public boolean removeIncomingInterceptor(final Interceptor interceptor) { return incomingInterceptors.remove(interceptor); } + @Override public boolean removeOutgoingInterceptor(final Interceptor interceptor) { return outgoingInterceptors.remove(interceptor); } + @Override public int getInitialMessagePacketSize() { return initialMessagePacketSize; } + @Override public ServerLocatorImpl setInitialMessagePacketSize(final int size) { checkWrite(); initialMessagePacketSize = size; return this; } + @Override public ServerLocatorImpl setGroupID(final String groupID) { checkWrite(); this.groupID = groupID; return this; } + @Override public String getGroupID() { return groupID; } + @Override public boolean isCompressLargeMessage() { return compressLargeMessage; } + @Override public ServerLocatorImpl setCompressLargeMessage(boolean avoid) { this.compressLargeMessage = avoid; return this; @@ -1242,33 +1323,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return initialConnectors.length; } + @Override public ServerLocatorImpl setIdentity(String identity) { this.identity = identity; return this; } + @Override public ServerLocatorImpl setNodeID(String nodeID) { this.nodeID = nodeID; return this; } + @Override public String getNodeID() { return nodeID; } + @Override public ServerLocatorImpl setClusterConnection(boolean clusterConnection) { this.clusterConnection = clusterConnection; return this; } + @Override public boolean isClusterConnection() { return clusterConnection; } + @Override public TransportConfiguration getClusterTransportConfiguration() { return clusterTransportConfiguration; } + @Override public ServerLocatorImpl setClusterTransportConfiguration(TransportConfiguration tc) { this.clusterTransportConfiguration = tc; return this; @@ -1283,10 +1371,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery super.finalize(); } + @Override public void cleanup() { doClose(false); } + @Override public void close() { doClose(true); } @@ -1428,6 +1518,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } + @Override public void notifyNodeUp(long uniqueEventID, final String nodeID, final String backupGroupName, @@ -1498,6 +1589,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public synchronized void connectorsChanged(List<DiscoveryEntry> newConnectors) { if (receivedTopology) { return; @@ -1522,6 +1614,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // to trigger the node notification to form the cluster. Runnable connectRunnable = new Runnable() { + @Override public void run() { try { connect(); @@ -1540,6 +1633,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public void factoryClosed(final ClientSessionFactory factory) { boolean isEmpty; synchronized (factories) { @@ -1557,6 +1651,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public Topology getTopology() { return topology; } @@ -1566,11 +1661,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return getNumInitialConnectors() > 0 || getDiscoveryGroupConfiguration() != null; } + @Override public ServerLocatorImpl addClusterTopologyListener(final ClusterTopologyListener listener) { topology.addClusterTopologyListener(listener); return this; } + @Override public void removeClusterTopologyListener(final ClusterTopologyListener listener) { topology.removeClusterTopologyListener(listener); } @@ -1799,6 +1896,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override public boolean isClosed() { synchronized (stateGuard) { return state != STATE.INITIALIZED; @@ -1835,6 +1933,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return; } AccessController.doPrivileged(new PrivilegedAction<Object>() { + @Override public Object run() { String[] arrayInterceptor = interceptorList.split(",");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java index 0f945aa..d7522b7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java @@ -81,6 +81,7 @@ public interface ServerLocatorInternal extends ServerLocator { ServerLocatorInternal setClusterTransportConfiguration(TransportConfiguration tc); + @Override Topology getTopology(); ClientProtocolManager newProtocolManager(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 83c327e..35bfb5d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -58,6 +58,7 @@ public final class Topology { private static final class DirectExecutor implements Executor { + @Override public void execute(final Runnable runnable) { runnable.run(); } @@ -246,6 +247,7 @@ public final class Topology { if (copy.size() > 0) { executor.execute(new Runnable() { + @Override public void run() { for (ClusterTopologyListener listener : copy) { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { @@ -311,6 +313,7 @@ public final class Topology { final ArrayList<ClusterTopologyListener> copy = copyListeners(); executor.execute(new Runnable() { + @Override public void run() { for (ClusterTopologyListener listener : copy) { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { @@ -335,6 +338,7 @@ public final class Topology { } executor.execute(new Runnable() { + @Override public void run() { int count = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index 10704c6..1696efc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -99,6 +99,7 @@ public final class TopologyMemberImpl implements TopologyMember { return connector; } + @Override public boolean isMember(RemotingConnection connection) { TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null; @@ -106,6 +107,7 @@ public final class TopologyMemberImpl implements TopologyMember { } + @Override public boolean isMember(TransportConfiguration configuration) { if (getConnector().getA() != null && getConnector().getA().isSameParams(configuration) || getConnector().getB() != null && getConnector().getB().isSameParams(configuration)) { return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java index 5e8fe77..4a91f65 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java @@ -95,6 +95,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { this.notificationService = service; } + @Override public synchronized void start() throws Exception { if (started) { return; @@ -132,6 +133,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { runnable.run(); } + @Override public void stop() { synchronized (this) { if (!started) { @@ -180,6 +182,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { } } + @Override public boolean isStarted() { return started; } @@ -245,6 +248,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { class DiscoveryRunnable implements Runnable { + @Override public void run() { byte[] data = null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index eae2de3..1d82bee 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -170,6 +170,7 @@ public abstract class MessageImpl implements MessageInternal { // Message implementation ---------------------------------------- + @Override public int getEncodeSize() { int headersPropsSize = getHeadersAndPropertiesEncodeSize(); @@ -180,6 +181,7 @@ public abstract class MessageImpl implements MessageInternal { return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize; } + @Override public int getHeadersAndPropertiesEncodeSize() { return DataConstants.SIZE_LONG + // Message ID DataConstants.SIZE_BYTE + // user id null? @@ -193,6 +195,7 @@ public abstract class MessageImpl implements MessageInternal { /* PropertySize and Properties */properties.getEncodeSize(); } + @Override public void encodeHeadersAndProperties(final ActiveMQBuffer buffer) { buffer.writeLong(messageID); buffer.writeNullableSimpleString(address); @@ -211,6 +214,7 @@ public abstract class MessageImpl implements MessageInternal { properties.encode(buffer); } + @Override public void decodeHeadersAndProperties(final ActiveMQBuffer buffer) { messageID = buffer.readLong(); address = buffer.readNullableSimpleString(); @@ -242,6 +246,7 @@ public abstract class MessageImpl implements MessageInternal { properties = msg.getTypedProperties(); } + @Override public ActiveMQBuffer getBodyBuffer() { if (bodyBuffer == null) { bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); @@ -250,12 +255,14 @@ public abstract class MessageImpl implements MessageInternal { return bodyBuffer; } + @Override public Message writeBodyBufferBytes(byte[] bytes) { getBodyBuffer().writeBytes(bytes); return this; } + @Override public Message writeBodyBufferString(String string) { getBodyBuffer().writeString(string); @@ -266,6 +273,7 @@ public abstract class MessageImpl implements MessageInternal { // no op on regular messages } + @Override public synchronized ActiveMQBuffer getBodyBufferCopy() { // Must copy buffer before sending it @@ -276,14 +284,17 @@ public abstract class MessageImpl implements MessageInternal { return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null); } + @Override public long getMessageID() { return messageID; } + @Override public UUID getUserID() { return userID; } + @Override public MessageImpl setUserID(final UUID userID) { this.userID = userID; return this; @@ -293,6 +304,7 @@ public abstract class MessageImpl implements MessageInternal { * this doesn't need to be synchronized as setAddress is protecting the buffer, * not the address */ + @Override public SimpleString getAddress() { return address; } @@ -302,6 +314,7 @@ public abstract class MessageImpl implements MessageInternal { * This synchronization can probably be removed since setAddress is always called from a single thread. * However I will keep it as it's harmless and it's been well tested */ + @Override public Message setAddress(final SimpleString address) { // This is protecting the buffer synchronized (this) { @@ -315,6 +328,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public byte getType() { return type; } @@ -323,10 +337,12 @@ public abstract class MessageImpl implements MessageInternal { this.type = type; } + @Override public boolean isDurable() { return durable; } + @Override public MessageImpl setDurable(final boolean durable) { if (this.durable != durable) { this.durable = durable; @@ -336,10 +352,12 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public long getExpiration() { return expiration; } + @Override public MessageImpl setExpiration(final long expiration) { if (this.expiration != expiration) { this.expiration = expiration; @@ -349,10 +367,12 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public long getTimestamp() { return timestamp; } + @Override public MessageImpl setTimestamp(final long timestamp) { if (this.timestamp != timestamp) { this.timestamp = timestamp; @@ -362,10 +382,12 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public byte getPriority() { return priority; } + @Override public MessageImpl setPriority(final byte priority) { if (this.priority != priority) { this.priority = priority; @@ -375,6 +397,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public boolean isExpired() { if (expiration == 0) { return false; @@ -383,6 +406,7 @@ public abstract class MessageImpl implements MessageInternal { return System.currentTimeMillis() - expiration >= 0; } + @Override public Map<String, Object> toMap() { Map<String, Object> map = new HashMap<String, Object>(); @@ -402,12 +426,14 @@ public abstract class MessageImpl implements MessageInternal { return map; } + @Override public void decodeFromBuffer(final ActiveMQBuffer buffer) { this.buffer = buffer; decode(); } + @Override public void bodyChanged() { // If the body is changed we must copy the buffer otherwise can affect the previously sent message // which might be in the Netty write queue @@ -418,6 +444,7 @@ public abstract class MessageImpl implements MessageInternal { endOfBodyPosition = -1; } + @Override public synchronized void checkCopy() { if (!copied) { forceCopy(); @@ -426,14 +453,17 @@ public abstract class MessageImpl implements MessageInternal { } } + @Override public synchronized void resetCopied() { copied = false; } + @Override public int getEndOfMessagePosition() { return endOfMessagePosition; } + @Override public int getEndOfBodyPosition() { if (endOfBodyPosition < 0) { endOfBodyPosition = buffer.writerIndex(); @@ -467,6 +497,7 @@ public abstract class MessageImpl implements MessageInternal { buff.readerIndex(start + length); } + @Override public synchronized ActiveMQBuffer getEncodedBuffer() { ActiveMQBuffer buff = encodeToBuffer(); @@ -486,6 +517,7 @@ public abstract class MessageImpl implements MessageInternal { } } + @Override public void setAddressTransient(final SimpleString address) { this.address = address; } @@ -493,6 +525,7 @@ public abstract class MessageImpl implements MessageInternal { // Properties // --------------------------------------------------------------------------------------- + @Override public Message putBooleanProperty(final SimpleString key, final boolean value) { properties.putBooleanProperty(key, value); @@ -501,6 +534,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putByteProperty(final SimpleString key, final byte value) { properties.putByteProperty(key, value); @@ -509,6 +543,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putBytesProperty(final SimpleString key, final byte[] value) { properties.putBytesProperty(key, value); @@ -533,6 +568,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putShortProperty(final SimpleString key, final short value) { properties.putShortProperty(key, value); bufferValid = false; @@ -540,6 +576,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putIntProperty(final SimpleString key, final int value) { properties.putIntProperty(key, value); bufferValid = false; @@ -547,6 +584,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putLongProperty(final SimpleString key, final long value) { properties.putLongProperty(key, value); bufferValid = false; @@ -554,6 +592,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putFloatProperty(final SimpleString key, final float value) { properties.putFloatProperty(key, value); @@ -562,6 +601,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putDoubleProperty(final SimpleString key, final double value) { properties.putDoubleProperty(key, value); @@ -570,6 +610,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putStringProperty(final SimpleString key, final SimpleString value) { properties.putSimpleStringProperty(key, value); @@ -578,6 +619,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { TypedProperties.setObjectProperty(key, value, properties); @@ -586,6 +628,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { putObjectProperty(new SimpleString(key), value); @@ -594,6 +637,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putBooleanProperty(final String key, final boolean value) { properties.putBooleanProperty(new SimpleString(key), value); @@ -602,6 +646,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putByteProperty(final String key, final byte value) { properties.putByteProperty(new SimpleString(key), value); @@ -610,6 +655,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putBytesProperty(final String key, final byte[] value) { properties.putBytesProperty(new SimpleString(key), value); @@ -618,6 +664,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putShortProperty(final String key, final short value) { properties.putShortProperty(new SimpleString(key), value); @@ -626,6 +673,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putIntProperty(final String key, final int value) { properties.putIntProperty(new SimpleString(key), value); @@ -634,6 +682,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putLongProperty(final String key, final long value) { properties.putLongProperty(new SimpleString(key), value); @@ -642,6 +691,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putFloatProperty(final String key, final float value) { properties.putFloatProperty(new SimpleString(key), value); @@ -650,6 +700,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putDoubleProperty(final String key, final double value) { properties.putDoubleProperty(new SimpleString(key), value); @@ -658,6 +709,7 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Message putStringProperty(final String key, final String value) { properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); @@ -674,74 +726,92 @@ public abstract class MessageImpl implements MessageInternal { return this; } + @Override public Object getObjectProperty(final SimpleString key) { return properties.getProperty(key); } + @Override public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getBooleanProperty(key); } + @Override public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getBooleanProperty(new SimpleString(key)); } + @Override public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getByteProperty(key); } + @Override public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getByteProperty(new SimpleString(key)); } + @Override public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getBytesProperty(key); } + @Override public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { return getBytesProperty(new SimpleString(key)); } + @Override public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getDoubleProperty(key); } + @Override public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getDoubleProperty(new SimpleString(key)); } + @Override public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getIntProperty(key); } + @Override public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getIntProperty(new SimpleString(key)); } + @Override public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getLongProperty(key); } + @Override public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getLongProperty(new SimpleString(key)); } + @Override public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getShortProperty(key); } + @Override public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getShortProperty(new SimpleString(key)); } + @Override public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getFloatProperty(key); } + @Override public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getFloatProperty(new SimpleString(key)); } + @Override public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { SimpleString str = getSimpleStringProperty(key); @@ -753,54 +823,66 @@ public abstract class MessageImpl implements MessageInternal { } } + @Override public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { return getStringProperty(new SimpleString(key)); } + @Override public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { return properties.getSimpleStringProperty(key); } + @Override public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { return properties.getSimpleStringProperty(new SimpleString(key)); } + @Override public Object getObjectProperty(final String key) { return properties.getProperty(new SimpleString(key)); } + @Override public Object removeProperty(final SimpleString key) { bufferValid = false; return properties.removeProperty(key); } + @Override public Object removeProperty(final String key) { bufferValid = false; return properties.removeProperty(new SimpleString(key)); } + @Override public boolean containsProperty(final SimpleString key) { return properties.containsProperty(key); } + @Override public boolean containsProperty(final String key) { return properties.containsProperty(new SimpleString(key)); } + @Override public Set<SimpleString> getPropertyNames() { return properties.getPropertyNames(); } + @Override public ActiveMQBuffer getWholeBuffer() { return buffer; } + @Override public BodyEncoder getBodyEncoder() throws ActiveMQException { return new DecodingContext(); } + @Override public TypedProperties getTypedProperties() { return this.properties; } @@ -956,21 +1038,26 @@ public abstract class MessageImpl implements MessageInternal { public DecodingContext() { } + @Override public void open() { } + @Override public void close() { } + @Override public long getLargeBodySize() { return buffer.writerIndex(); } + @Override public int encode(final ByteBuffer bufferRead) throws ActiveMQException { ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead); return encode(buffer, bufferRead.capacity()); } + @Override public int encode(final ActiveMQBuffer bufferOut, final int size) { bufferOut.writeBytes(getWholeBuffer(), lastPos, size); lastPos += size; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index b45fd5a..0dbda12 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -109,14 +109,17 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { public ActiveMQClientProtocolManager() { } + @Override public String getName() { return ActiveMQClient.DEFAULT_CORE_PROTOCOL; } + @Override public void setSessionFactory(ClientSessionFactory factory) { this.factoryInternal = (ClientSessionFactoryInternal) factory; } + @Override public ClientSessionFactory getSessionFactory() { return this.factoryInternal; } @@ -126,6 +129,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { pipeline.addLast("activemq-decoder", new ActiveMQFrameDecoder2()); } + @Override public boolean waitOnLatch(long milliseconds) throws InterruptedException { return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); } @@ -139,6 +143,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + @Override public RemotingConnection getCurrentConnection() { return connection; } @@ -152,6 +157,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + @Override public Lock lockSessionCreation() { try { Lock localFailoverLock = factoryInternal.lockFailover(); @@ -179,6 +185,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + @Override public void stop() { alive = false; @@ -196,6 +203,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } + @Override public boolean isAlive() { return alive; } @@ -351,6 +359,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); } + @Override public boolean cleanupBeforeFailover(ActiveMQException cause) { boolean needToInterrupt; @@ -401,6 +410,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { return message.isOkToFailover(); } + @Override public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, @@ -435,6 +445,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { this.conn = conn; } + @Override public void handlePacket(final Packet packet) { final byte type = packet.getType(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java index 24727c9..5675fff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java @@ -45,6 +45,7 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag return factory; } + @Override public ClientProtocolManager newProtocolManager() { return new ActiveMQClientProtocolManager(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 1bac653..05ae474 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -122,6 +122,7 @@ public class ActiveMQSessionContext extends SessionContext { return name; } + @Override public void resetName(String name) { this.name = name; } @@ -153,6 +154,7 @@ public class ActiveMQSessionContext extends SessionContext { } private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { + @Override public void commandConfirmed(final Packet packet) { if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; @@ -195,6 +197,7 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.unlock(); } + @Override public void cleanup() { sessionChannel.close(); @@ -208,11 +211,13 @@ public class ActiveMQSessionContext extends SessionContext { // nothing to be done here... Flow control here is done on the core side } + @Override public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { sessionChannel.setCommandConfirmationHandler(confirmationHandler); this.sendAckHandler = handler; } + @Override public void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filterString, @@ -220,10 +225,12 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE); } + @Override public void deleteQueue(final SimpleString queueName) throws ActiveMQException { sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE); } + @Override public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); @@ -231,6 +238,7 @@ public class ActiveMQSessionContext extends SessionContext { return response.toQueueQuery(); } + @Override public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, int windowSize, @@ -254,10 +262,12 @@ public class ActiveMQSessionContext extends SessionContext { return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } + @Override public int getServerVersion() { return serverVersion; } + @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); @@ -269,39 +279,48 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE); } + @Override public void sendConsumerCredits(final ClientConsumer consumer, final int credits) { sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits)); } + @Override public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException { SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence); sessionChannel.send(request); } + @Override public void simpleCommit() throws ActiveMQException { sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); } + @Override public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException { sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE); } + @Override public void sessionStart() throws ActiveMQException { sessionChannel.send(new PacketImpl(PacketImpl.SESS_START)); } + @Override public void sessionStop() throws ActiveMQException { sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE); } + @Override public void addSessionMetadata(String key, String data) throws ActiveMQException { sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE); } + @Override public void addUniqueMetaData(String key, String data) throws ActiveMQException { sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE); } + @Override public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException { SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase); SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); @@ -315,6 +334,7 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException { Packet packet; if (flags == XAResource.TMSUSPEND) { @@ -337,6 +357,7 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public void sendProducerCreditsMessage(final int credits, final SimpleString address) { sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address)); } @@ -346,6 +367,7 @@ public class ActiveMQSessionContext extends SessionContext { * * @return */ + @Override public boolean supportsLargeMessage() { return true; } @@ -355,6 +377,7 @@ public class ActiveMQSessionContext extends SessionContext { return msgI.getEncodeSize(); } + @Override public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, @@ -399,6 +422,7 @@ public class ActiveMQSessionContext extends SessionContext { return chunkPacket.getPacketSize(); } + @Override public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, @@ -419,16 +443,19 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException { SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID()); sessionChannel.send(messagePacket); } + @Override public void sessionClose() throws ActiveMQException { sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE); } + @Override public void xaForget(Xid xid) throws XAException, ActiveMQException { SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP); @@ -437,6 +464,7 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public int xaPrepare(Xid xid) throws XAException, ActiveMQException { SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid); @@ -450,6 +478,7 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public Xid[] xaScan() throws ActiveMQException { SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP); @@ -460,6 +489,7 @@ public class ActiveMQSessionContext extends SessionContext { return xidArray; } + @Override public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException { SessionXARollbackMessage packet = new SessionXARollbackMessage(xid); @@ -470,6 +500,7 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException { Packet packet; if (flags == XAResource.TMJOIN) { @@ -494,18 +525,21 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public boolean configureTransactionTimeout(int seconds) throws ActiveMQException { SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP); return response.isOK(); } + @Override public int recoverSessionTimeout() throws ActiveMQException { SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP); return response.getTimeoutSeconds(); } + @Override public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, @@ -546,6 +580,7 @@ public class ActiveMQSessionContext extends SessionContext { } + @Override public void recreateSession(final String username, final String password, final int minLargeMessageSize, @@ -624,10 +659,12 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override public void xaFailed(Xid xid) throws ActiveMQException { sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid)); } + @Override public void restartSession() throws ActiveMQException { sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START)); } @@ -693,6 +730,7 @@ public class ActiveMQSessionContext extends SessionContext { class ClientSessionPacketHandler implements ChannelHandler { + @Override public void handlePacket(final Packet packet) { byte type = packet.getType(); @@ -755,6 +793,7 @@ public class ActiveMQSessionContext extends SessionContext { protected ClassLoader lookupTCCL() { return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + @Override public ClassLoader run() { return Thread.currentThread().getContextClassLoader(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 57ed1e8..b152156 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -139,6 +139,7 @@ public final class ChannelImpl implements Channel { this.interceptors = interceptors; } + @Override public boolean supports(final byte packetType) { int version = connection.getClientVersion(); @@ -160,26 +161,32 @@ public final class ChannelImpl implements Channel { } } + @Override public long getID() { return id; } + @Override public int getLastConfirmedCommandID() { return lastConfirmedCommandID.get(); } + @Override public Lock getLock() { return lock; } + @Override public int getConfirmationWindowSize() { return confWindowSize; } + @Override public void returnBlocking() { returnBlocking(null); } + @Override public void returnBlocking(Throwable cause) { lock.lock(); @@ -193,18 +200,22 @@ public final class ChannelImpl implements Channel { } } + @Override public boolean sendAndFlush(final Packet packet) { return send(packet, true, false); } + @Override public boolean send(final Packet packet) { return send(packet, false, false); } + @Override public boolean sendBatched(final Packet packet) { return send(packet, false, true); } + @Override public void setTransferring(boolean transferring) { this.transferring = transferring; } @@ -273,6 +284,7 @@ public final class ChannelImpl implements Channel { * and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception. * The expectedPacket will be used to filter out undesirable packets that would belong to previous calls. */ + @Override public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException { String interceptionResult = invokeInterceptors(packet, interceptors, connection); @@ -408,6 +420,7 @@ public final class ChannelImpl implements Channel { return null; } + @Override public void setCommandConfirmationHandler(final CommandConfirmationHandler handler) { if (confWindowSize < 0) { final String msg = "You can't set confirmationHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information."; @@ -416,14 +429,17 @@ public final class ChannelImpl implements Channel { commandConfirmationHandler = handler; } + @Override public void setHandler(final ChannelHandler handler) { this.handler = handler; } + @Override public ChannelHandler getHandler() { return handler; } + @Override public void close() { if (closed) { return; @@ -439,6 +455,7 @@ public final class ChannelImpl implements Channel { closed = true; } + @Override public void transferConnection(final CoreRemotingConnection newConnection) { // Needs to synchronize on the connection to make sure no packets from // the old connection get processed after transfer has occurred @@ -457,6 +474,7 @@ public final class ChannelImpl implements Channel { } } + @Override public void replayCommands(final int otherLastConfirmedCommandID) { if (resendCache != null) { if (isTrace) { @@ -470,6 +488,7 @@ public final class ChannelImpl implements Channel { } } + @Override public void lock() { lock.lock(); @@ -478,6 +497,7 @@ public final class ChannelImpl implements Channel { lock.unlock(); } + @Override public void unlock() { lock.lock(); @@ -488,11 +508,13 @@ public final class ChannelImpl implements Channel { lock.unlock(); } + @Override public CoreRemotingConnection getConnection() { return connection; } // Needs to be synchronized since can be called by remoting service timer thread too for timeout flush + @Override public synchronized void flushConfirmations() { if (resendCache != null && receivedBytes != 0) { receivedBytes = 0; @@ -505,6 +527,7 @@ public final class ChannelImpl implements Channel { } } + @Override public void confirm(final Packet packet) { if (resendCache != null && packet.isRequiresConfirmations()) { lastConfirmedCommandID.incrementAndGet(); @@ -523,6 +546,7 @@ public final class ChannelImpl implements Channel { } } + @Override public void clearCommands() { if (resendCache != null) { lastConfirmedCommandID.set(-1); @@ -533,6 +557,7 @@ public final class ChannelImpl implements Channel { } } + @Override public void handlePacket(final Packet packet) { if (packet.getType() == PacketImpl.PACKETS_CONFIRMED) { if (resendCache != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index fffdec1..9aa8d3c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -253,18 +253,22 @@ public class PacketImpl implements Packet { // Public -------------------------------------------------------- + @Override public byte getType() { return type; } + @Override public long getChannelID() { return channelID; } + @Override public void setChannelID(final long channelID) { this.channelID = channelID; } + @Override public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); @@ -286,6 +290,7 @@ public class PacketImpl implements Packet { return buffer; } + @Override public void decode(final ActiveMQBuffer buffer) { channelID = buffer.readLong(); @@ -294,6 +299,7 @@ public class PacketImpl implements Packet { size = buffer.readerIndex(); } + @Override public int getPacketSize() { if (size == -1) { throw new IllegalStateException("Packet hasn't been encoded/decoded yet"); @@ -302,6 +308,7 @@ public class PacketImpl implements Packet { return size; } + @Override public boolean isResponse() { return false; } @@ -312,6 +319,7 @@ public class PacketImpl implements Packet { public void decodeRest(final ActiveMQBuffer buffer) { } + @Override public boolean isRequiresConfirmations() { return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 6868713..a19c3f3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -151,6 +151,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement /** * @return the clientVersion */ + @Override public int getClientVersion() { return clientVersion; } @@ -158,10 +159,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement /** * @param clientVersion the clientVersion to set */ + @Override public void setClientVersion(int clientVersion) { this.clientVersion = clientVersion; } + @Override public synchronized Channel getChannel(final long channelID, final int confWindowSize) { Channel channel = channels.get(channelID); @@ -174,14 +177,17 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement return channel; } + @Override public synchronized boolean removeChannel(final long channelID) { return channels.remove(channelID) != null; } + @Override public synchronized void putChannel(final long channelID, final Channel channel) { channels.put(channelID, channel); } + @Override public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failLock) { if (destroyed) { @@ -212,6 +218,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement } } + @Override public void destroy() { synchronized (failLock) { if (destroyed) { @@ -226,10 +233,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement callClosingListeners(); } + @Override public void disconnect(final boolean criticalError) { disconnect(null, criticalError); } + @Override public void disconnect(String scaleDownNodeID, final boolean criticalError) { Channel channel0 = getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); @@ -266,10 +275,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement channel0.sendAndFlush(disconnect); } + @Override public long generateChannelID() { return idGenerator.generateID(); } + @Override public synchronized void syncIDGeneratorSequence(final long id) { if (!idGeneratorSynced) { idGenerator = new SimpleIDGenerator(id); @@ -278,22 +289,27 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement } } + @Override public long getIDGeneratorSequence() { return idGenerator.getCurrentID(); } + @Override public Object getTransferLock() { return transferLock; } + @Override public boolean isClient() { return client; } + @Override public boolean isDestroyed() { return destroyed; } + @Override public long getBlockingCallTimeout() { return blockingCallTimeout; } @@ -305,6 +321,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement //We flush any confirmations on the connection - this prevents idle bridges for example //sitting there with many unacked messages + @Override public void flush() { synchronized (transferLock) { for (Channel channel : channels.values()) { @@ -313,12 +330,14 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement } } + @Override public ActiveMQPrincipal getDefaultActiveMQPrincipal() { return getTransportConnection().getDefaultActiveMQPrincipal(); } // Buffer Handler implementation // ---------------------------------------------------- + @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { try { final Packet packet = packetDecoder.decode(buffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index 160e9bc..75bc879 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -30,10 +30,12 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI this.message = message; } + @Override public Message getMessage() { return message; } + @Override public String toString() { return this.getParentString() + ",message=" + message + "]"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java index c98ac0a..8c1b153 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java @@ -88,6 +88,7 @@ public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryRespon return result; } + @Override public ClientSession.QueueQuery toQueueQuery() { return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreationEnabled()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java index 244e138..60b8336 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java @@ -63,6 +63,7 @@ public class TransportConfigurationUtil { private static Object instantiateObject(final String className) { return AccessController.doPrivileged(new PrivilegedAction<Object>() { + @Override public Object run() { try { return ClassloadingUtil.newInstanceFromClassLoader(className); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index e55fe97..2c1f1c8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -96,6 +96,7 @@ public class NettyConnection implements Connection { } // Connection implementation ---------------------------- + @Override public void forceClose() { if (channel != null) { try { @@ -116,14 +117,17 @@ public class NettyConnection implements Connection { return channel; } + @Override public RemotingConnection getProtocolConnection() { return protocolConnection; } + @Override public void setProtocolConnection(RemotingConnection protocolConnection) { this.protocolConnection = protocolConnection; } + @Override public void close() { if (closed) { return; @@ -150,16 +154,19 @@ public class NettyConnection implements Connection { listener.connectionDestroyed(getID()); } + @Override public ActiveMQBuffer createTransportBuffer(final int size) { return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); } + @Override public Object getID() { // TODO: Think of it return channel.hashCode(); } // This is called periodically to flush the batch buffer + @Override public void checkFlushBatchBuffer() { if (!batchingEnabled) { return; @@ -179,14 +186,17 @@ public class NettyConnection implements Connection { } } + @Override public void write(final ActiveMQBuffer buffer) { write(buffer, false, false); } + @Override public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) { write(buffer, flush, batched, null); } + @Override public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched, @@ -291,6 +301,7 @@ public class NettyConnection implements Connection { } } + @Override public String getRemoteAddress() { SocketAddress address = channel.remoteAddress(); if (address == null) { @@ -299,6 +310,7 @@ public class NettyConnection implements Connection { return address.toString(); } + @Override public String getLocalAddress() { SocketAddress address = channel.localAddress(); if (address == null) { @@ -311,15 +323,18 @@ public class NettyConnection implements Connection { return directDeliver; } + @Override public void addReadyListener(final ReadyListener listener) { readyListeners.add(listener); } + @Override public void removeReadyListener(final ReadyListener listener) { readyListeners.remove(listener); } //never allow this + @Override public ActiveMQPrincipal getDefaultActiveMQPrincipal() { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index decac89..60d913c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -346,6 +346,7 @@ public class NettyConnector extends AbstractConnector { "]"; } + @Override public synchronized void start() { if (channelClazz != null) { return; @@ -454,6 +455,7 @@ public class NettyConnector extends AbstractConnector { } bootstrap.handler(new ChannelInitializer<Channel>() { + @Override public void initChannel(Channel channel) throws Exception { final ChannelPipeline pipeline = channel.pipeline(); if (sslEnabled && !useServlet) { @@ -528,6 +530,7 @@ public class NettyConnector extends AbstractConnector { ActiveMQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION); } + @Override public synchronized void close() { if (channelClazz == null) { return; @@ -559,10 +562,12 @@ public class NettyConnector extends AbstractConnector { connections.clear(); } + @Override public boolean isStarted() { return channelClazz != null; } + @Override public Connection createConnection() { if (channelClazz == null) { return null; @@ -866,6 +871,7 @@ public class NettyConnector extends AbstractConnector { private java.util.concurrent.Future<?> future; + @Override public synchronized void run() { if (closed) { return; @@ -895,6 +901,7 @@ public class NettyConnector extends AbstractConnector { private class Listener implements ConnectionLifeCycleListener { + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { @@ -903,10 +910,12 @@ public class NettyConnector extends AbstractConnector { } } + @Override public void connectionDestroyed(final Object connectionID) { if (connections.remove(connectionID) != null) { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { + @Override public void run() { listener.connectionDestroyed(connectionID); } @@ -914,15 +923,18 @@ public class NettyConnector extends AbstractConnector { } } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { + @Override public void run() { listener.connectionException(connectionID, me); } }); } + @Override public void connectionReadyForWrites(Object connectionID, boolean ready) { listener.connectionReadyForWrites(connectionID, ready); } @@ -933,6 +945,7 @@ public class NettyConnector extends AbstractConnector { private boolean cancelled; + @Override public synchronized void run() { if (!cancelled) { for (Connection connection : connections.values()) { @@ -946,6 +959,7 @@ public class NettyConnector extends AbstractConnector { } } + @Override public boolean isEquivalent(Map<String, Object> configuration) { //here we only check host and port because these two parameters //is sufficient to determine the target host @@ -976,6 +990,7 @@ public class NettyConnector extends AbstractConnector { return result; } + @Override public void finalize() throws Throwable { close(); super.finalize(); @@ -992,6 +1007,7 @@ public class NettyConnector extends AbstractConnector { private static ClassLoader getThisClassLoader() { return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + @Override public ClassLoader run() { return ClientSessionFactoryImpl.class.getClassLoader(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java index 99c6a56..a7c5f0e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; public class NettyConnectorFactory implements ConnectorFactory { + @Override public Connector createConnector(final Map<String, Object> configuration, final BufferHandler handler, final ConnectionLifeCycleListener listener, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/SSLSupport.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/SSLSupport.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/SSLSupport.java index 3593294..092b1c7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/SSLSupport.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/SSLSupport.java @@ -162,6 +162,7 @@ public class SSLSupport { */ private static URL findResource(final String resourceName) { return AccessController.doPrivileged(new PrivilegedAction<URL>() { + @Override public URL run() { return ClassloadingUtil.findResource(resourceName); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/transaction/impl/XidImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/transaction/impl/XidImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/transaction/impl/XidImpl.java index f29bf32..324797a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/transaction/impl/XidImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/transaction/impl/XidImpl.java @@ -91,14 +91,17 @@ public class XidImpl implements Xid, Serializable { // Xid implementation ------------------------------------------------------------------ + @Override public byte[] getBranchQualifier() { return branchQualifier; } + @Override public int getFormatId() { return formatId; } + @Override public byte[] getGlobalTransactionId() { return globalTransactionId; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/version/impl/VersionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/version/impl/VersionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/version/impl/VersionImpl.java index 0394a11..c8437e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/version/impl/VersionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/version/impl/VersionImpl.java @@ -60,30 +60,37 @@ public class VersionImpl implements Version, Serializable { // Version implementation ------------------------------------------ + @Override public String getFullVersion() { return versionName; } + @Override public String getVersionName() { return versionName; } + @Override public int getMajorVersion() { return majorVersion; } + @Override public int getMinorVersion() { return minorVersion; } + @Override public int getMicroVersion() { return microVersion; } + @Override public int getIncrementingVersion() { return incrementingVersion; } + @Override public boolean isCompatible(int version) { for (int element : compatibleVersionList) { if (element == version) {