http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 66d5cbb..1a67dff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -227,10 +227,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C connectionReadyForWrites = true; } + @Override public void disableFinalizeCheck() { finalizeCheck = false; } + @Override public Lock lockFailover() { newFailoverLock.lock(); return newFailoverLock; @@ -244,6 +246,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws ActiveMQException { // Get the connection @@ -259,10 +262,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } + @Override public TransportConfiguration getConnectorConfiguration() { return connectorConfig; } + @Override public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; @@ -290,10 +295,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override public Object getBackupConnector() { return backupConfig; } + @Override public ClientSession createSession(final String username, final String password, final boolean xa, @@ -304,35 +311,42 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize); } + @Override public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks, final int ackBatchSize) throws ActiveMQException { return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, serverLocator.isPreAcknowledge(), ackBatchSize); } + @Override public ClientSession createXASession() throws ActiveMQException { return createSessionInternal(null, null, true, false, false, serverLocator.isPreAcknowledge(), serverLocator.getAckBatchSize()); } + @Override public ClientSession createTransactedSession() throws ActiveMQException { return createSessionInternal(null, null, false, false, false, serverLocator.isPreAcknowledge(), serverLocator.getAckBatchSize()); } + @Override public ClientSession createSession() throws ActiveMQException { return createSessionInternal(null, null, false, true, true, serverLocator.isPreAcknowledge(), serverLocator.getAckBatchSize()); } + @Override public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws ActiveMQException { return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, serverLocator.isPreAcknowledge(), serverLocator.getAckBatchSize()); } + @Override public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws ActiveMQException { return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, serverLocator.isPreAcknowledge(), serverLocator.getAckBatchSize()); } + @Override public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks, @@ -342,11 +356,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // ConnectionLifeCycleListener implementation -------------------------------------------------- + @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, final String protocol) { } + @Override public void connectionDestroyed(final Object connectionID) { // The exception has to be created in the same thread where it's being called // as to avoid a different stack trace cause @@ -355,6 +371,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // It has to use the same executor as the disconnect message is being sent through closeExecutor.execute(new Runnable() { + @Override public void run() { handleConnectionFailure(connectionID, ex); } @@ -362,18 +379,21 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } + @Override public void connectionException(final Object connectionID, final ActiveMQException me) { handleConnectionFailure(connectionID, me); } // Must be synchronized to prevent it happening concurrently with failover which can lead to // inconsistencies + @Override public void removeSession(final ClientSessionInternal session, final boolean failingOver) { synchronized (sessions) { sessions.remove(session); } } + @Override public void connectionReadyForWrites(final Object connectionID, final boolean ready) { synchronized (connectionReadyLock) { if (connectionReadyForWrites != ready) { @@ -385,31 +405,38 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override public synchronized int numConnections() { return connection != null ? 1 : 0; } + @Override public int numSessions() { return sessions.size(); } + @Override public void addFailureListener(final SessionFailureListener listener) { listeners.add(listener); } + @Override public boolean removeFailureListener(final SessionFailureListener listener) { return listeners.remove(listener); } + @Override public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener) { failoverListeners.add(listener); return this; } + @Override public boolean removeFailoverListener(FailoverEventListener listener) { return failoverListeners.remove(listener); } + @Override public void causeExit() { clientProtocolManager.stop(); } @@ -447,6 +474,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C checkCloseConnection(); } + @Override public void close() { if (closed) { return; @@ -456,6 +484,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C serverLocator.factoryClosed(this); } + @Override public void cleanup() { if (closed) { return; @@ -464,6 +493,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C interruptConnectAndCloseAllSessions(false); } + @Override public boolean isClosed() { return closed || serverLocator.isClosed(); } @@ -856,6 +886,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override public RemotingConnection getConnection() { if (closed) throw new IllegalStateException("ClientSessionFactory is closed!"); @@ -948,6 +979,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // else... we will try to instantiate a new one return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>() { + @Override public ConnectorFactory run() { return (ConnectorFactory) ClassloadingUtil.newInstanceFromClassLoader(connectorFactoryClassName); } @@ -966,6 +998,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // Must be executed on new thread since cannot block the Netty thread for a long time and fail // can cause reconnect loop + @Override public void run() { try { CLOSE_RUNNABLES.add(this); @@ -990,10 +1023,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } + @Override public void setReconnectAttempts(final int attempts) { reconnectAttempts = attempts; } + @Override public Object getConnector() { return connector; } @@ -1117,6 +1152,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private class DelegatingBufferHandler implements BufferHandler { + @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { RemotingConnection theConn = connection; @@ -1162,6 +1198,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C pingRunnable = new WeakReference<PingRunnable>(runnable); } + @Override public void run() { PingRunnable runnable = pingRunnable.get(); @@ -1180,6 +1217,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private long lastCheck = System.currentTimeMillis(); + @Override public synchronized void run() { if (cancelled || stopPingingAfterOne && !first) { return; @@ -1200,6 +1238,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C threadPool.execute(new Runnable() { // Must be executed on different thread + @Override public void run() { connection.fail(me); }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 0a24022..2942b8c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -234,28 +234,33 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // ClientSession implementation // ----------------------------------------------------------------- + @Override public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { internalCreateQueue(address, queueName, null, false, false); } + @Override public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws ActiveMQException { internalCreateQueue(address, queueName, null, durable, false); } + @Override public void createQueue(final String address, final String queueName, final boolean durable) throws ActiveMQException { createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), durable); } + @Override public void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException { createSharedQueue(address, queueName, null, durable); } + @Override public void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filterString, @@ -273,6 +278,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } + @Override public void createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, @@ -280,6 +286,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi internalCreateQueue(address, queueName, filterString, durable, false); } + @Override public void createQueue(final String address, final String queueName, final String filterString, @@ -287,26 +294,31 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), durable); } + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { internalCreateQueue(address, queueName, null, false, true); } + @Override public void createTemporaryQueue(final String address, final String queueName) throws ActiveMQException { internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); } + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws ActiveMQException { internalCreateQueue(address, queueName, filter, false, true); } + @Override public void createTemporaryQueue(final String address, final String queueName, final String filter) throws ActiveMQException { internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); } + @Override public void deleteQueue(final SimpleString queueName) throws ActiveMQException { checkClosed(); @@ -319,10 +331,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void deleteQueue(final String queueName) throws ActiveMQException { deleteQueue(SimpleString.toSimpleString(queueName)); } + @Override public QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { checkClosed(); @@ -336,50 +350,60 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } + @Override public AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { checkClosed(); return sessionContext.addressQuery(address); } + @Override public ClientConsumer createConsumer(final SimpleString queueName) throws ActiveMQException { return createConsumer(queueName, null, false); } + @Override public ClientConsumer createConsumer(final String queueName) throws ActiveMQException { return createConsumer(SimpleString.toSimpleString(queueName)); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws ActiveMQException { return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, false); } + @Override public void createQueue(final String address, final String queueName) throws ActiveMQException { createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName)); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString) throws ActiveMQException { return createConsumer(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString)); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean browseOnly) throws ActiveMQException { return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws ActiveMQException { return createConsumer(queueName, null, consumerWindowSize, consumerMaxRate, browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString, final boolean browseOnly) throws ActiveMQException { return createConsumer(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final boolean browseOnly) throws ActiveMQException { return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly); } @@ -394,6 +418,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi * the client during that period, so failover won't occur. If we want direct consumers we need to * rethink how they work. */ + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final int windowSize, @@ -402,6 +427,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString, final int windowSize, @@ -410,18 +436,22 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return createConsumer(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), windowSize, maxRate, browseOnly); } + @Override public ClientProducer createProducer() throws ActiveMQException { return createProducer((SimpleString) null); } + @Override public ClientProducer createProducer(final SimpleString address) throws ActiveMQException { return createProducer(address, producerMaxRate); } + @Override public ClientProducer createProducer(final String address) throws ActiveMQException { return createProducer(SimpleString.toSimpleString(address)); } + @Override public ClientProducer createProducer(final SimpleString address, final int maxRate) throws ActiveMQException { return internalCreateProducer(address, maxRate); } @@ -430,6 +460,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return createProducer(SimpleString.toSimpleString(address), rate); } + @Override public XAResource getXAResource() { return this; } @@ -444,6 +475,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi throw ActiveMQClientMessageBundle.BUNDLE.txOutcomeUnknown(); } + @Override public void commit() throws ActiveMQException { checkClosed(); @@ -490,14 +522,17 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi workDone = false; } + @Override public boolean isRollbackOnly() { return rollbackOnly; } + @Override public void rollback() throws ActiveMQException { rollback(false); } + @Override public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")"); @@ -532,10 +567,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi rollbackOnly = false; } + @Override public void markRollbackOnly() { rollbackOnly = true; } + @Override public ClientMessage createMessage(final byte type, final boolean durable, final long expiration, @@ -544,34 +581,42 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize); } + @Override public ClientMessage createMessage(final byte type, final boolean durable) { return this.createMessage(type, durable, 0, System.currentTimeMillis(), (byte) 4); } + @Override public ClientMessage createMessage(final boolean durable) { return this.createMessage((byte) 0, durable); } + @Override public boolean isClosed() { return closed; } + @Override public boolean isAutoCommitSends() { return autoCommitSends; } + @Override public boolean isAutoCommitAcks() { return autoCommitAcks; } + @Override public boolean isBlockOnAcknowledge() { return blockOnAcknowledge; } + @Override public boolean isXA() { return xa; } + @Override public void resetIfNeeded() throws ActiveMQException { if (rollbackOnly) { ActiveMQClientLogger.LOGGER.resettingSessionAfterFailure(); @@ -579,6 +624,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public ClientSessionImpl start() throws ActiveMQException { checkClosed(); @@ -595,6 +641,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return this; } + @Override public void stop() throws ActiveMQException { stop(true); } @@ -613,26 +660,32 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void addFailureListener(final SessionFailureListener listener) { sessionFactory.addFailureListener(listener); } + @Override public boolean removeFailureListener(final SessionFailureListener listener) { return sessionFactory.removeFailureListener(listener); } + @Override public void addFailoverListener(FailoverEventListener listener) { sessionFactory.addFailoverListener(listener); } + @Override public boolean removeFailoverListener(FailoverEventListener listener) { return sessionFactory.removeFailoverListener(listener); } + @Override public int getVersion() { return sessionContext.getServerVersion(); } + @Override public boolean isClosing() { return inClose; } @@ -650,10 +703,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // ClientSessionInternal implementation // ------------------------------------------------------------ + @Override public int getMinLargeMessageSize() { return minLargeMessageSize; } + @Override public boolean isCompressLargeMessages() { return compressLargeMessages; } @@ -661,10 +716,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi /** * @return the cacheLargeMessageClient */ + @Override public boolean isCacheLargeMessageClient() { return cacheLargeMessageClient; } + @Override public String getName() { return name; } @@ -672,6 +729,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi /** * Acknowledges all messages received by the consumer so far. */ + @Override public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { // if we're pre-acknowledging then we don't need to do anything if (preAcknowledge) { @@ -692,6 +750,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { // if we're pre-acknowledging then we don't need to do anything if (preAcknowledge) { @@ -710,6 +769,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void expire(final ClientConsumer consumer, final Message message) throws ActiveMQException { checkClosed(); @@ -719,30 +779,35 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void addConsumer(final ClientConsumerInternal consumer) { synchronized (consumers) { consumers.put(consumer.getConsumerContext(), consumer); } } + @Override public void addProducer(final ClientProducerInternal producer) { synchronized (producers) { producers.add(producer); } } + @Override public void removeConsumer(final ClientConsumerInternal consumer) throws ActiveMQException { synchronized (consumers) { consumers.remove(consumer.getConsumerContext()); } } + @Override public void removeProducer(final ClientProducerInternal producer) { synchronized (producers) { producers.remove(producer); } } + @Override public void handleReceiveMessage(final ConsumerContext consumerID, final ClientMessageInternal message) throws Exception { ClientConsumerInternal consumer = getConsumer(consumerID); @@ -752,6 +817,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void handleReceiveLargeMessage(final ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception { @@ -762,6 +828,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, @@ -792,6 +859,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void close() throws ActiveMQException { if (closed) { ActiveMQClientLogger.LOGGER.debug("Session was already closed, giving up now, this=" + this); @@ -821,6 +889,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi doCleanup(false); } + @Override public synchronized void cleanUp(boolean failingOver) throws ActiveMQException { if (closed) { return; @@ -833,11 +902,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi doCleanup(failingOver); } + @Override public ClientSessionImpl setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { sessionContext.setSendAcknowledgementHandler(handler); return this; } + @Override public void preHandleFailover(RemotingConnection connection) { // We lock the channel to prevent any packets to be added to the re-send // cache during the failover process @@ -847,6 +918,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // Needs to be synchronized to prevent issues with occurring concurrently with close() + @Override public void handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { synchronized (this) { if (closed) { @@ -947,6 +1019,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } + @Override public void addMetaData(String key, String data) throws ActiveMQException { synchronized (metadata) { metadata.put(key, data); @@ -955,14 +1028,17 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.addSessionMetadata(key, data); } + @Override public void addUniqueMetaData(String key, String data) throws ActiveMQException { sessionContext.addUniqueMetaData(key, data); } + @Override public ClientSessionFactory getSessionFactory() { return sessionFactory; } + @Override public void setAddress(final Message message, final SimpleString address) { if (defaultAddress == null) { defaultAddress = address; @@ -979,48 +1055,58 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void setPacketSize(final int packetSize) { if (packetSize > this.initialMessagePacketSize) { this.initialMessagePacketSize = (int) (packetSize * 1.2); } } + @Override public void workDone() { workDone = true; } + @Override public void sendProducerCreditsMessage(final int credits, final SimpleString address) { sessionContext.sendProducerCreditsMessage(credits, address); } + @Override public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon) { ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext); return credits; } + @Override public void returnCredits(final SimpleString address) { producerCreditManager.returnCredits(address); } + @Override public void handleReceiveProducerCredits(final SimpleString address, final int credits) { producerCreditManager.receiveCredits(address, credits); } + @Override public void handleReceiveProducerFailCredits(final SimpleString address, int credits) { producerCreditManager.receiveFailCredits(address, credits); } + @Override public ClientProducerCreditManager getProducerCreditManager() { return producerCreditManager; } + @Override public void startCall() { if (concurrentCall.incrementAndGet() > 1) { ActiveMQClientLogger.LOGGER.invalidConcurrentSessionUsage(new Exception("trace")); } } + @Override public void endCall() { concurrentCall.decrementAndGet(); } @@ -1032,6 +1118,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // XAResource implementation // -------------------------------------------------------------------- + @Override public void commit(final Xid xid, final boolean onePhase) throws XAException { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("call commit(xid=" + convert(xid)); @@ -1073,6 +1160,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void end(final Xid xid, final int flags) throws XAException { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags)); @@ -1118,6 +1206,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void forget(final Xid xid) throws XAException { checkXA(); startCall(); @@ -1138,6 +1227,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public int getTransactionTimeout() throws XAException { checkXA(); @@ -1152,6 +1242,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public boolean setTransactionTimeout(final int seconds) throws XAException { checkXA(); @@ -1166,6 +1257,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public boolean isSameRM(final XAResource xares) throws XAException { checkXA(); @@ -1203,6 +1295,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return null; } + @Override public int prepare(final Xid xid) throws XAException { checkXA(); if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { @@ -1269,6 +1362,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public Xid[] recover(final int flags) throws XAException { checkXA(); @@ -1287,6 +1381,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return new Xid[0]; } + @Override public void rollback(final Xid xid) throws XAException { checkXA(); @@ -1341,6 +1436,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void start(final Xid xid, final int flags) throws XAException { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags)); @@ -1388,6 +1484,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // FailureListener implementation -------------------------------------------- + @Override public void connectionFailed(final ActiveMQException me, boolean failedOver) { try { cleanUp(false); @@ -1397,6 +1494,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } + @Override public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { connectionFailed(me, failedOver); } @@ -1404,10 +1502,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // Public // ---------------------------------------------------------------------------- + @Override public void setForceNotSameRM(final boolean force) { forceNotSameRM = force; } + @Override public RemotingConnection getConnection() { return sessionContext.getRemotingConnection(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index ae711bf..0cb38f6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -47,6 +47,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll /** * */ + @Override public void discardUnusedPackets() { bufferDelegate.discardUnusedPackets(); } @@ -54,22 +55,27 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll /** * Add a buff to the List, or save it to the OutputStream if set */ + @Override public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) { bufferDelegate.addPacket(chunk, flowControlSize, isContinues); } + @Override public synchronized void cancel() { bufferDelegate.cancel(); } + @Override public synchronized void close() { bufferDelegate.cancel(); } + @Override public void setOutputStream(final OutputStream output) throws ActiveMQException { bufferDelegate.setOutputStream(new InflaterWriter(output)); } + @Override public synchronized void saveBuffer(final OutputStream output) throws ActiveMQException { setOutputStream(output); waitCompletion(0); @@ -78,6 +84,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll /** * @param timeWait Milliseconds to Wait. 0 means forever */ + @Override public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException { return bufferDelegate.waitCompletion(timeWait); } @@ -108,6 +115,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalStateException("Position not supported over compressed large messages"); } + @Override public byte readByte() { try { return getStream().readByte(); @@ -196,82 +204,102 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public int readerIndex() { return 0; } + @Override public void readerIndex(final int readerIndex) { // TODO } + @Override public int writerIndex() { // TODO return 0; } + @Override public long getSize() { return this.bufferDelegate.getSize(); } + @Override public void writerIndex(final int writerIndex) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setIndex(final int readerIndex, final int writerIndex) { positioningNotSupported(); } + @Override public void clear() { } + @Override public boolean readable() { return true; } + @Override public boolean writable() { return false; } + @Override public int readableBytes() { return 1; } + @Override public int writableBytes() { return 0; } + @Override public void markReaderIndex() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void resetReaderIndex() { // TODO: reset positioning if possible } + @Override public void markWriterIndex() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void resetWriterIndex() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void discardReadBytes() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public short getUnsignedByte(final int index) { return (short) (getByte(index) & 0xFF); } + @Override public int getUnsignedShort(final int index) { return getShort(index) & 0xFFFF; } + @Override public long getUnsignedInt(final int index) { return getInt(index) & 0xFFFFFFFFL; } + @Override public void getBytes(int index, final byte[] dst) { // TODO: optimize this by using System.arraycopy for (int i = 0; i < dst.length; i++) { @@ -279,10 +307,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public void getBytes(final int index, final ActiveMQBuffer dst) { getBytes(index, dst, dst.writableBytes()); } + @Override public void getBytes(final int index, final ActiveMQBuffer dst, final int length) { if (length > dst.writableBytes()) { throw new IndexOutOfBoundsException(); @@ -291,18 +321,22 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll dst.writerIndex(dst.writerIndex() + length); } + @Override public void setBytes(final int index, final byte[] src) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setBytes(final int index, final ActiveMQBuffer src) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setBytes(final int index, final ActiveMQBuffer src, final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public int readUnsignedByte() { try { return getStream().readUnsignedByte(); @@ -312,6 +346,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public short readShort() { try { return getStream().readShort(); @@ -321,6 +356,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public int readUnsignedShort() { try { return getStream().readUnsignedShort(); @@ -330,6 +366,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public int readInt() { try { return getStream().readInt(); @@ -339,10 +376,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public long readUnsignedInt() { return readInt() & 0xFFFFFFFFL; } + @Override public long readLong() { try { return getStream().readLong(); @@ -352,6 +391,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public void readBytes(final byte[] dst, final int dstIndex, final int length) { try { int nReadBytes = getStream().read(dst, dstIndex, length); @@ -364,14 +404,17 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } } + @Override public void readBytes(final byte[] dst) { readBytes(dst, 0, dst.length); } + @Override public void readBytes(final ActiveMQBuffer dst) { readBytes(dst, dst.writableBytes()); } + @Override public void readBytes(final ActiveMQBuffer dst, final int length) { if (length > dst.writableBytes()) { throw new IndexOutOfBoundsException(); @@ -380,18 +423,21 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll dst.writerIndex(dst.writerIndex() + length); } + @Override public void readBytes(final ActiveMQBuffer dst, final int dstIndex, final int length) { byte[] destBytes = new byte[length]; readBytes(destBytes); dst.setBytes(dstIndex, destBytes); } + @Override public void readBytes(final ByteBuffer dst) { byte[] bytesToGet = new byte[dst.remaining()]; readBytes(bytesToGet); dst.put(bytesToGet); } + @Override public int skipBytes(final int length) { try { @@ -425,38 +471,47 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } + @Override public void writeByte(final byte value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeShort(final short value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeInt(final int value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeLong(final long value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeBytes(final byte[] src, final int srcIndex, final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeBytes(final byte[] src) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeBytes(final ActiveMQBuffer src, final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeBytes(final ByteBuffer src) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public ByteBuffer toByteBuffer() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } @@ -475,18 +530,22 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll return (char) readShort(); } + @Override public char getChar(final int index) { return (char) getShort(index); } + @Override public double getDouble(final int index) { return Double.longBitsToDouble(getLong(index)); } + @Override public float getFloat(final int index) { return Float.intBitsToFloat(getInt(index)); } + @Override public ActiveMQBuffer readBytes(final int length) { byte[] bytesToGet = new byte[length]; readBytes(bytesToGet); @@ -604,48 +663,59 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public ActiveMQBuffer copy() { throw new UnsupportedOperationException(); } + @Override public ActiveMQBuffer slice(final int index, final int length) { throw new UnsupportedOperationException(); } // Inner classes ------------------------------------------------- + @Override public ByteBuf byteBuf() { return null; } + @Override public ActiveMQBuffer copy(final int index, final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public ActiveMQBuffer duplicate() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public ActiveMQBuffer readSlice(final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setChar(final int index, final char value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setDouble(final int index, final double value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void setFloat(final int index, final float value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public ActiveMQBuffer slice() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java index 4d72dc9..95bb26f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java @@ -90,10 +90,12 @@ public class DelegatingSession implements ClientSessionInternal { } } + @Override public boolean isClosing() { return session.isClosing(); } + @Override public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { session.acknowledge(consumer, message); } @@ -103,34 +105,42 @@ public class DelegatingSession implements ClientSessionInternal { session.addLifeCycleListener(lifeCycleListener); } + @Override public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { session.individualAcknowledge(consumer, message); } + @Override public void addConsumer(final ClientConsumerInternal consumer) { session.addConsumer(consumer); } + @Override public void addFailureListener(final SessionFailureListener listener) { session.addFailureListener(listener); } + @Override public void addFailoverListener(FailoverEventListener listener) { session.addFailoverListener(listener); } + @Override public void addProducer(final ClientProducerInternal producer) { session.addProducer(producer); } + @Override public AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { return session.addressQuery(address); } + @Override public void cleanUp(boolean failingOver) throws ActiveMQException { session.cleanUp(failingOver); } + @Override public void close() throws ActiveMQException { closed = true; @@ -141,22 +151,27 @@ public class DelegatingSession implements ClientSessionInternal { session.close(); } + @Override public void markRollbackOnly() { session.markRollbackOnly(); } + @Override public void commit() throws ActiveMQException { session.commit(); } + @Override public void commit(final Xid xid, final boolean onePhase) throws XAException { session.commit(xid, onePhase); } + @Override public ClientMessage createMessage(final boolean durable) { return session.createMessage(durable); } + @Override public ClientMessage createMessage(final byte type, final boolean durable, final long expiration, @@ -165,16 +180,19 @@ public class DelegatingSession implements ClientSessionInternal { return session.createMessage(type, durable, expiration, timestamp, priority); } + @Override public ClientMessage createMessage(final byte type, final boolean durable) { return session.createMessage(type, durable); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean browseOnly) throws ActiveMQException { return session.createConsumer(queueName, filterString, browseOnly); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final int windowSize, @@ -183,21 +201,25 @@ public class DelegatingSession implements ClientSessionInternal { return session.createConsumer(queueName, filterString, windowSize, maxRate, browseOnly); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws ActiveMQException { return session.createConsumer(queueName, filterString); } + @Override public ClientConsumer createConsumer(final SimpleString queueName) throws ActiveMQException { return session.createConsumer(queueName); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString, final boolean browseOnly) throws ActiveMQException { return session.createConsumer(queueName, filterString, browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString, final int windowSize, @@ -206,47 +228,58 @@ public class DelegatingSession implements ClientSessionInternal { return session.createConsumer(queueName, filterString, windowSize, maxRate, browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final String filterString) throws ActiveMQException { return session.createConsumer(queueName, filterString); } + @Override public ClientConsumer createConsumer(final String queueName) throws ActiveMQException { return session.createConsumer(queueName); } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws ActiveMQException { return session.createConsumer(queueName, browseOnly); } + @Override public ClientConsumer createConsumer(final String queueName, final boolean browseOnly) throws ActiveMQException { return session.createConsumer(queueName, browseOnly); } + @Override public ClientProducer createProducer() throws ActiveMQException { return session.createProducer(); } + @Override public ClientProducer createProducer(final SimpleString address, final int rate) throws ActiveMQException { return session.createProducer(address, rate); } + @Override public ClientProducer createProducer(final SimpleString address) throws ActiveMQException { return session.createProducer(address); } + @Override public ClientProducer createProducer(final String address) throws ActiveMQException { return session.createProducer(address); } + @Override public void createQueue(final String address, final String queueName) throws ActiveMQException { session.createQueue(address, queueName); } + @Override public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { session.createQueue(address, queueName); } + @Override public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws ActiveMQException { @@ -268,6 +301,7 @@ public class DelegatingSession implements ClientSessionInternal { session.createSharedQueue(address, queueName, filter, durable); } + @Override public void createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, @@ -275,12 +309,14 @@ public class DelegatingSession implements ClientSessionInternal { session.createQueue(address, queueName, filterString, durable); } + @Override public void createQueue(final String address, final String queueName, final boolean durable) throws ActiveMQException { session.createQueue(address, queueName, durable); } + @Override public void createQueue(final String address, final String queueName, final String filterString, @@ -288,74 +324,91 @@ public class DelegatingSession implements ClientSessionInternal { session.createQueue(address, queueName, filterString, durable); } + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws ActiveMQException { session.createTemporaryQueue(address, queueName, filter); } + @Override public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException { session.createTemporaryQueue(address, queueName); } + @Override public void createTemporaryQueue(final String address, final String queueName, final String filter) throws ActiveMQException { session.createTemporaryQueue(address, queueName, filter); } + @Override public void createTemporaryQueue(final String address, final String queueName) throws ActiveMQException { session.createTemporaryQueue(address, queueName); } + @Override public void deleteQueue(final SimpleString queueName) throws ActiveMQException { session.deleteQueue(queueName); } + @Override public void deleteQueue(final String queueName) throws ActiveMQException { session.deleteQueue(queueName); } + @Override public void end(final Xid xid, final int flags) throws XAException { session.end(xid, flags); } + @Override public void expire(final ClientConsumer consumer, final Message message) throws ActiveMQException { session.expire(consumer, message); } + @Override public void forget(final Xid xid) throws XAException { session.forget(xid); } + @Override public RemotingConnection getConnection() { return session.getConnection(); } + @Override public int getMinLargeMessageSize() { return session.getMinLargeMessageSize(); } + @Override public String getName() { return session.getName(); } + @Override public int getTransactionTimeout() throws XAException { return session.getTransactionTimeout(); } + @Override public int getVersion() { return session.getVersion(); } + @Override public XAResource getXAResource() { return session.getXAResource(); } + @Override public void preHandleFailover(RemotingConnection connection) { session.preHandleFailover(connection); } + @Override public void handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { session.handleFailover(backupConnection, cause); } @@ -385,152 +438,189 @@ public class DelegatingSession implements ClientSessionInternal { session.handleConsumerDisconnect(consumerContext); } + @Override public boolean isAutoCommitAcks() { return session.isAutoCommitAcks(); } + @Override public boolean isAutoCommitSends() { return session.isAutoCommitSends(); } + @Override public boolean isBlockOnAcknowledge() { return session.isBlockOnAcknowledge(); } + @Override public boolean isCacheLargeMessageClient() { return session.isCacheLargeMessageClient(); } + @Override public boolean isClosed() { return session.isClosed(); } + @Override public boolean isSameRM(final XAResource xares) throws XAException { return session.isSameRM(xares); } + @Override public boolean isXA() { return session.isXA(); } + @Override public int prepare(final Xid xid) throws XAException { return session.prepare(xid); } + @Override public QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { return session.queueQuery(queueName); } + @Override public Xid[] recover(final int flag) throws XAException { return session.recover(flag); } + @Override public void removeConsumer(final ClientConsumerInternal consumer) throws ActiveMQException { session.removeConsumer(consumer); } + @Override public boolean removeFailureListener(final SessionFailureListener listener) { return session.removeFailureListener(listener); } + @Override public boolean removeFailoverListener(FailoverEventListener listener) { return session.removeFailoverListener(listener); } + @Override public void removeProducer(final ClientProducerInternal producer) { session.removeProducer(producer); } + @Override public void rollback() throws ActiveMQException { session.rollback(); } + @Override public boolean isRollbackOnly() { return session.isRollbackOnly(); } + @Override public void rollback(final boolean considerLastMessageAsDelivered) throws ActiveMQException { session.rollback(considerLastMessageAsDelivered); } + @Override public void rollback(final Xid xid) throws XAException { session.rollback(xid); } + @Override public DelegatingSession setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { session.setSendAcknowledgementHandler(handler); return this; } + @Override public boolean setTransactionTimeout(final int seconds) throws XAException { return session.setTransactionTimeout(seconds); } + @Override public void resetIfNeeded() throws ActiveMQException { session.resetIfNeeded(); } + @Override public DelegatingSession start() throws ActiveMQException { session.start(); return this; } + @Override public void start(final Xid xid, final int flags) throws XAException { session.start(xid, flags); } + @Override public void stop() throws ActiveMQException { session.stop(); } + @Override public ClientSessionFactory getSessionFactory() { return session.getSessionFactory(); } + @Override public void setForceNotSameRM(final boolean force) { session.setForceNotSameRM(force); } + @Override public void workDone() { session.workDone(); } + @Override public void sendProducerCreditsMessage(final int credits, final SimpleString address) { session.sendProducerCreditsMessage(credits, address); } + @Override public ClientProducerCredits getCredits(final SimpleString address, final boolean anon) { return session.getCredits(address, anon); } + @Override public void returnCredits(final SimpleString address) { session.returnCredits(address); } + @Override public void handleReceiveProducerCredits(final SimpleString address, final int credits) { session.handleReceiveProducerCredits(address, credits); } + @Override public void handleReceiveProducerFailCredits(final SimpleString address, final int credits) { session.handleReceiveProducerFailCredits(address, credits); } + @Override public ClientProducerCreditManager getProducerCreditManager() { return session.getProducerCreditManager(); } + @Override public void setAddress(Message message, SimpleString address) { session.setAddress(message, address); } + @Override public void setPacketSize(int packetSize) { session.setPacketSize(packetSize); } + @Override public void addMetaData(String key, String data) throws ActiveMQException { session.addMetaData(key, data); } + @Override public boolean isCompressLargeMessages() { return session.isCompressLargeMessages(); } @@ -546,10 +636,12 @@ public class DelegatingSession implements ClientSessionInternal { } + @Override public void startCall() { session.startCall(); } + @Override public void endCall() { session.endCall(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index ad79e82..fb5b687 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -134,6 +134,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { this.local = local; } + @Override public void discardUnusedPackets() { if (outStream == null) { if (local) @@ -150,6 +151,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { * TODO: move this to ConsumerContext as large message is a protocol specific thing * Add a buff to the List, or save it to the OutputStream if set */ + @Override public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) { int flowControlCredit = 0; @@ -206,6 +208,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public void cancel() { this.handledException = ActiveMQClientMessageBundle.BUNDLE.largeMessageInterrupted(); @@ -232,12 +235,14 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public synchronized void close() { if (fileCache != null) { fileCache.close(); } } + @Override public void setOutputStream(final OutputStream output) throws ActiveMQException { int totalFlowControl = 0; @@ -268,6 +273,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public synchronized void saveBuffer(final OutputStream output) throws ActiveMQException { if (streamClosed) { throw ActiveMQClientMessageBundle.BUNDLE.largeMessageLostSession(); @@ -280,6 +286,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { * @param timeWait Milliseconds to Wait. 0 means forever * @throws ActiveMQException */ + @Override public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException { if (outStream == null) { // There is no stream.. it will never achieve the end of streaming @@ -344,6 +351,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { return -1; } + @Override public byte readByte() { return getByte(readerIndex++); } @@ -425,6 +433,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { return out.write(ByteBuffer.wrap(bytesToGet)); } + @Override public int getInt(final int index) { return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 | (getByte(index + 2) & 0xff) << 8 | @@ -515,10 +524,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public int readerIndex() { return (int) readerIndex; } + @Override public void readerIndex(final int readerIndex) { try { checkForPacket(readerIndex); @@ -530,18 +541,22 @@ public class LargeMessageControllerImpl implements LargeMessageController { this.readerIndex = readerIndex; } + @Override public int writerIndex() { return (int) totalSize; } + @Override public long getSize() { return totalSize; } + @Override public void writerIndex(final int writerIndex) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setIndex(final int readerIndex, final int writerIndex) { try { checkForPacket(readerIndex); @@ -553,17 +568,21 @@ public class LargeMessageControllerImpl implements LargeMessageController { this.readerIndex = readerIndex; } + @Override public void clear() { } + @Override public boolean readable() { return true; } + @Override public boolean writable() { return false; } + @Override public int readableBytes() { long readableBytes = totalSize - readerIndex; @@ -575,14 +594,17 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public int writableBytes() { return 0; } + @Override public void markReaderIndex() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void resetReaderIndex() { try { checkForPacket(0); @@ -593,22 +615,27 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public void markWriterIndex() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void resetWriterIndex() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void discardReadBytes() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public short getUnsignedByte(final int index) { return (short) (getByte(index) & 0xFF); } + @Override public int getUnsignedShort(final int index) { return getShort(index) & 0xFFFF; } @@ -621,10 +648,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { return value; } + @Override public long getUnsignedInt(final int index) { return getInt(index) & 0xFFFFFFFFL; } + @Override public void getBytes(int index, final byte[] dst) { // TODO: optimize this by using System.arraycopy for (int i = 0; i < dst.length; i++) { @@ -639,10 +668,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } + @Override public void getBytes(final int index, final ActiveMQBuffer dst) { getBytes(index, dst, dst.writableBytes()); } + @Override public void getBytes(final int index, final ActiveMQBuffer dst, final int length) { if (length > dst.writableBytes()) { throw new IndexOutOfBoundsException(); @@ -651,14 +682,17 @@ public class LargeMessageControllerImpl implements LargeMessageController { dst.writerIndex(dst.writerIndex() + length); } + @Override public void setBytes(final int index, final byte[] src) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setBytes(final int index, final ActiveMQBuffer src) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setBytes(final int index, final ActiveMQBuffer src, final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } @@ -667,16 +701,19 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public int readUnsignedByte() { return (short) (readByte() & 0xFF); } + @Override public short readShort() { short v = getShort(readerIndex); readerIndex += 2; return v; } + @Override public int readUnsignedShort() { return readShort() & 0xFFFF; } @@ -695,6 +732,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { return v; } + @Override public int readInt() { int v = getInt(readerIndex); readerIndex += 4; @@ -706,29 +744,35 @@ public class LargeMessageControllerImpl implements LargeMessageController { return v; } + @Override public long readUnsignedInt() { return readInt() & 0xFFFFFFFFL; } + @Override public long readLong() { long v = getLong(readerIndex); readerIndex += 8; return v; } + @Override public void readBytes(final byte[] dst, final int dstIndex, final int length) { getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; } + @Override public void readBytes(final byte[] dst) { readBytes(dst, 0, dst.length); } + @Override public void readBytes(final ActiveMQBuffer dst) { readBytes(dst, dst.writableBytes()); } + @Override public void readBytes(final ActiveMQBuffer dst, final int length) { if (length > dst.writableBytes()) { throw new IndexOutOfBoundsException(); @@ -737,11 +781,13 @@ public class LargeMessageControllerImpl implements LargeMessageController { dst.writerIndex(dst.writerIndex() + length); } + @Override public void readBytes(final ActiveMQBuffer dst, final int dstIndex, final int length) { getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; } + @Override public void readBytes(final ByteBuffer dst) { int length = dst.remaining(); getBytes(readerIndex, dst); @@ -759,6 +805,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { readerIndex += length; } + @Override public int skipBytes(final int length) { long newReaderIndex = readerIndex + length; @@ -767,10 +814,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { return length; } + @Override public void writeByte(final byte value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeShort(final short value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } @@ -779,18 +828,22 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeInt(final int value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeLong(final long value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeBytes(final byte[] src, final int srcIndex, final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeBytes(final byte[] src) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } @@ -799,10 +852,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeBytes(final ActiveMQBuffer src, final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeBytes(final ByteBuffer src) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } @@ -819,6 +874,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public ByteBuffer toByteBuffer() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } @@ -849,18 +905,22 @@ public class LargeMessageControllerImpl implements LargeMessageController { return (char) readShort(); } + @Override public char getChar(final int index) { return (char) getShort(index); } + @Override public double getDouble(final int index) { return Double.longBitsToDouble(getLong(index)); } + @Override public float getFloat(final int index) { return Float.intBitsToFloat(getInt(index)); } + @Override public ActiveMQBuffer readBytes(final int length) { byte[] bytesToGet = new byte[length]; getBytes(readerIndex, bytesToGet); @@ -979,10 +1039,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public ActiveMQBuffer copy() { throw new UnsupportedOperationException(); } + @Override public ActiveMQBuffer slice(final int index, final int length) { throw new UnsupportedOperationException(); } @@ -1198,38 +1260,47 @@ public class LargeMessageControllerImpl implements LargeMessageController { return ByteUtil.readLine(this); } + @Override public ByteBuf byteBuf() { return null; } + @Override public ActiveMQBuffer copy(final int index, final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public ActiveMQBuffer duplicate() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public ActiveMQBuffer readSlice(final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setChar(final int index, final char value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setDouble(final int index, final double value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void setFloat(final int index, final float value) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public ActiveMQBuffer slice() { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index 27607bc..40ea86a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -70,38 +70,47 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.autoCreateJmsQueues = autoCreateJmsQueues; } + @Override public SimpleString getName() { return name; } + @Override public SimpleString getAddress() { return address; } + @Override public int getConsumerCount() { return consumerCount; } + @Override public SimpleString getFilterString() { return filterString; } + @Override public long getMessageCount() { return messageCount; } + @Override public boolean isDurable() { return durable; } + @Override public boolean isAutoCreateJmsQueues() { return autoCreateJmsQueues; } + @Override public boolean isTemporary() { return temporary; } + @Override public boolean isExists() { return exists; }