http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQBytesMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQBytesMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQBytesMessage.java deleted file mode 100644 index 5213ffa..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQBytesMessage.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; - -import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.Message; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.core.message.impl.MessageImpl; - -import static org.apache.activemq.reader.BytesMessageUtil.bytesMessageReset; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadBoolean; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadByte; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadBytes; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadChar; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadDouble; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadFloat; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadInt; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadLong; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadShort; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUTF; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUnsignedByte; -import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUnsignedShort; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteBoolean; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteByte; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteBytes; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteChar; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteDouble; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteFloat; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteInt; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteLong; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteObject; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteShort; -import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteUTF; - -/** - * HornetQ implementation of a JMS {@link BytesMessage}. - * - * @author Norbert Lataille (norbert.latai...@m4x.org) - * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - */ -public class HornetQBytesMessage extends HornetQMessage implements BytesMessage -{ - // Static ------------------------------------------------------- - public static final byte TYPE = Message.BYTES_TYPE; - - // Attributes ---------------------------------------------------- - - private int bodyLength; - - // Constructor --------------------------------------------------- - - /** - * This constructor is used to construct messages prior to sending - */ - protected HornetQBytesMessage(final ClientSession session) - { - super(HornetQBytesMessage.TYPE, session); - } - - /** - * Constructor on receipt at client side - */ - protected HornetQBytesMessage(final ClientMessage message, final ClientSession session) - { - super(message, session); - } - - /** - * Foreign message constructor - */ - public HornetQBytesMessage(final BytesMessage foreign, final ClientSession session) throws JMSException - { - super(foreign, HornetQBytesMessage.TYPE, session); - - foreign.reset(); - - byte[] buffer = new byte[1024]; - int n = foreign.readBytes(buffer); - while (n != -1) - { - writeBytes(buffer, 0, n); - n = foreign.readBytes(buffer); - } - } - - // BytesMessage implementation ----------------------------------- - - public boolean readBoolean() throws JMSException - { - checkRead(); - try - { - return bytesReadBoolean(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public byte readByte() throws JMSException - { - checkRead(); - try - { - return bytesReadByte(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public int readUnsignedByte() throws JMSException - { - checkRead(); - try - { - return bytesReadUnsignedByte(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public short readShort() throws JMSException - { - checkRead(); - try - { - return bytesReadShort(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public int readUnsignedShort() throws JMSException - { - checkRead(); - try - { - return bytesReadUnsignedShort(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public char readChar() throws JMSException - { - checkRead(); - try - { - return bytesReadChar(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public int readInt() throws JMSException - { - checkRead(); - try - { - return bytesReadInt(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public long readLong() throws JMSException - { - checkRead(); - try - { - return bytesReadLong(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public float readFloat() throws JMSException - { - checkRead(); - try - { - return bytesReadFloat(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public double readDouble() throws JMSException - { - checkRead(); - try - { - return bytesReadDouble(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public String readUTF() throws JMSException - { - checkRead(); - try - { - return bytesReadUTF(message); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - catch (Exception e) - { - JMSException je = new JMSException("Failed to get UTF"); - je.setLinkedException(e); - je.initCause(e); - throw je; - } - } - - public int readBytes(final byte[] value) throws JMSException - { - checkRead(); - return bytesReadBytes(message, value); - } - - public int readBytes(final byte[] value, final int length) throws JMSException - { - checkRead(); - return bytesReadBytes(message, value, length); - - } - - public void writeBoolean(final boolean value) throws JMSException - { - checkWrite(); - bytesWriteBoolean(message, value); - } - - public void writeByte(final byte value) throws JMSException - { - checkWrite(); - bytesWriteByte(message, value); - } - - public void writeShort(final short value) throws JMSException - { - checkWrite(); - bytesWriteShort(message, value); - } - - public void writeChar(final char value) throws JMSException - { - checkWrite(); - bytesWriteChar(message, value); - } - - public void writeInt(final int value) throws JMSException - { - checkWrite(); - bytesWriteInt(message, value); - } - - public void writeLong(final long value) throws JMSException - { - checkWrite(); - bytesWriteLong(message, value); - } - - public void writeFloat(final float value) throws JMSException - { - checkWrite(); - bytesWriteFloat(message, value); - } - - public void writeDouble(final double value) throws JMSException - { - checkWrite(); - bytesWriteDouble(message, value); - } - - public void writeUTF(final String value) throws JMSException - { - checkWrite(); - try - { - bytesWriteUTF(message, value); - } - catch (Exception e) - { - JMSException je = new JMSException("Failed to write UTF"); - je.setLinkedException(e); - je.initCause(e); - throw je; - } - - } - - public void writeBytes(final byte[] value) throws JMSException - { - checkWrite(); - bytesWriteBytes(message, value); - } - - public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException - { - checkWrite(); - bytesWriteBytes(message, value, offset, length); - } - - public void writeObject(final Object value) throws JMSException - { - checkWrite(); - if (!bytesWriteObject(message, value)) - { - throw new MessageFormatException("Invalid object for properties"); - } - } - - public void reset() throws JMSException - { - if (!readOnly) - { - readOnly = true; - - bodyLength = message.getBodySize(); - } - - bytesMessageReset(message); - } - - @Override - public void doBeforeReceive() throws ActiveMQException - { - bodyLength = message.getBodySize(); - } - - // HornetQRAMessage overrides ---------------------------------------- - - @Override - public void clearBody() throws JMSException - { - super.clearBody(); - - try - { - getBuffer().clear(); - } - catch (RuntimeException e) - { - JMSException e2 = new JMSException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - public long getBodyLength() throws JMSException - { - checkRead(); - - return bodyLength; - } - - @Override - public void doBeforeSend() throws Exception - { - reset(); - } - - // Public -------------------------------------------------------- - - @Override - public byte getType() - { - return HornetQBytesMessage.TYPE; - } - - private ActiveMQBuffer getBuffer() - { - return message.getBodyBuffer(); - } - - @Override - public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") - Class c) - { - return c.isAssignableFrom(byte[].class); - } - - @Override - protected <T> T getBodyInternal(Class<T> c) - { - if (bodyLength == 0) - return null; - byte[] dst = new byte[bodyLength]; - message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst); - return (T)dst; - } -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnection.java deleted file mode 100644 index 6980131..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnection.java +++ /dev/null @@ -1,862 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.InvalidClientIDException; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import java.lang.ref.WeakReference; -import java.util.HashSet; -import java.util.Set; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQExceptionType; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.FailoverEventListener; -import org.apache.activemq.api.core.client.FailoverEventType; -import org.apache.activemq.api.core.client.SessionFailureListener; -import org.apache.activemq.api.jms.HornetQJMSConstants; -import org.apache.activemq.core.client.impl.ClientSessionInternal; -import org.apache.activemq.core.version.Version; -import org.apache.activemq.reader.MessageUtil; -import org.apache.activemq.utils.ConcurrentHashSet; -import org.apache.activemq.utils.UUIDGenerator; -import org.apache.activemq.utils.VersionLoader; - -/** - * HornetQ implementation of a JMS Connection. - * <p> - * The flat implementation of {@link TopicConnection} and {@link QueueConnection} is per design, - * following the common usage of these as one flat API in JMS 1.1. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - */ -public class HornetQConnection extends HornetQConnectionForContextImpl implements TopicConnection, QueueConnection -{ - // Constants ------------------------------------------------------------------------------------ - public static final int TYPE_GENERIC_CONNECTION = 0; - - public static final int TYPE_QUEUE_CONNECTION = 1; - - public static final int TYPE_TOPIC_CONNECTION = 2; - - public static final String EXCEPTION_FAILOVER = "FAILOVER"; - - public static final String EXCEPTION_DISCONNECT = "DISCONNECT"; - - public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME; - - // Static --------------------------------------------------------------------------------------- - - // Attributes ----------------------------------------------------------------------------------- - - private final int connectionType; - - private final Set<HornetQSession> sessions = new org.apache.activemq.utils.ConcurrentHashSet<HornetQSession>(); - - private final Set<SimpleString> tempQueues = new org.apache.activemq.utils.ConcurrentHashSet<SimpleString>(); - - private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<SimpleString>(); - - private volatile boolean hasNoLocal; - - private volatile ExceptionListener exceptionListener; - - private volatile FailoverEventListener failoverEventListener; - - private volatile boolean justCreated = true; - - private volatile ConnectionMetaData metaData; - - private volatile boolean closed; - - private volatile boolean started; - - private String clientID; - - private final ClientSessionFactory sessionFactory; - - private final SimpleString uid; - - private final String username; - - private final String password; - - private final SessionFailureListener listener = new JMSFailureListener(this); - - private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this); - - private final Version thisVersion; - - private final int dupsOKBatchSize; - - private final int transactionBatchSize; - - private ClientSession initialSession; - - private final Exception creationStack; - - private HornetQConnectionFactory factoryReference; - - // Constructors --------------------------------------------------------------------------------- - - public HornetQConnection(final String username, final String password, final int connectionType, - final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, - final ClientSessionFactory sessionFactory) - { - this.username = username; - - this.password = password; - - this.connectionType = connectionType; - - this.clientID = clientID; - - this.sessionFactory = sessionFactory; - - uid = UUIDGenerator.getInstance().generateSimpleStringUUID(); - - thisVersion = VersionLoader.getVersion(); - - this.dupsOKBatchSize = dupsOKBatchSize; - - this.transactionBatchSize = transactionBatchSize; - - creationStack = new Exception(); - } - - /** - * This internal method serves basically the Resource Adapter. - * The resource adapter plays with an XASession and a non XASession. - * When there is no enlisted transaction, the EE specification mandates that the commit should - * be done as if it was a nonXA Session (i.e. SessionTransacted). - * For that reason we have this method to force that nonXASession, since the JMS Javadoc - * mandates createSession to return a XASession. - */ - public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - checkClosed(); - - return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_GENERIC_CONNECTION); - } - - /** - * This internal method serves basically the Resource Adapter. - * The resource adapter plays with an XASession and a non XASession. - * When there is no enlisted transaction, the EE specification mandates that the commit should - * be done as if it was a nonXA Session (i.e. SessionTransacted). - * For that reason we have this method to force that nonXASession, since the JMS Javadoc - * mandates createSession to return a XASession. - */ - public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - checkClosed(); - - return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_TOPIC_CONNECTION); - } - - /** - * This internal method serves basically the Resource Adapter. - * The resource adapter plays with an XASession and a non XASession. - * When there is no enlisted transaction, the EE specification mandates that the commit should - * be done as if it was a nonXA Session (i.e. SessionTransacted). - * For that reason we have this method to force that nonXASession, since the JMS Javadoc - * mandates createSession to return a XASession. - */ - public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - checkClosed(); - - return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_QUEUE_CONNECTION); - } - - - // Connection implementation -------------------------------------------------------------------- - - public synchronized Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - checkClosed(); - - return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQConnection.TYPE_GENERIC_CONNECTION); - } - - public String getClientID() throws JMSException - { - checkClosed(); - - return clientID; - } - - public void setClientID(final String clientID) throws JMSException - { - checkClosed(); - - if (this.clientID != null) - { - throw new IllegalStateException("Client id has already been set"); - } - - if (!justCreated) - { - throw new IllegalStateException("setClientID can only be called directly after the connection is created"); - } - - try - { - initialSession.addUniqueMetaData("jms-client-id", clientID); - } - catch (ActiveMQException e) - { - if (e.getType() == ActiveMQExceptionType.DUPLICATE_METADATA) - { - throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection"); - } - } - - this.clientID = clientID; - try - { - this.addSessionMetaData(initialSession); - } - catch (ActiveMQException e) - { - JMSException ex = new JMSException("Internal error setting metadata jms-client-id"); - ex.setLinkedException(e); - ex.initCause(e); - throw ex; - } - - justCreated = false; - } - - public ConnectionMetaData getMetaData() throws JMSException - { - checkClosed(); - - justCreated = false; - - if (metaData == null) - { - metaData = new HornetQConnectionMetaData(thisVersion); - } - - return metaData; - } - - public ExceptionListener getExceptionListener() throws JMSException - { - checkClosed(); - - justCreated = false; - - return exceptionListener; - } - - public void setExceptionListener(final ExceptionListener listener) throws JMSException - { - checkClosed(); - - exceptionListener = listener; - justCreated = false; - } - - public synchronized void start() throws JMSException - { - checkClosed(); - - for (HornetQSession session : sessions) - { - session.start(); - } - - justCreated = false; - started = true; - } - - public synchronized void signalStopToAllSessions() - { - for (HornetQSession session : sessions) - { - ClientSession coreSession = session.getCoreSession(); - if (coreSession instanceof ClientSessionInternal) - { - ClientSessionInternal internalSession = (ClientSessionInternal) coreSession; - internalSession.setStopSignal(); - } - } - - } - - public synchronized void stop() throws JMSException - { - threadAwareContext.assertNotMessageListenerThread(); - - checkClosed(); - - for (HornetQSession session : sessions) - { - session.stop(); - } - - justCreated = false; - started = false; - } - - public final synchronized void close() throws JMSException - { - threadAwareContext.assertNotCompletionListenerThread(); - threadAwareContext.assertNotMessageListenerThread(); - - if (closed) - { - return; - } - - sessionFactory.close(); - - try - { - for (HornetQSession session : new HashSet<HornetQSession>(sessions)) - { - session.close(); - } - - try - { - if (!tempQueues.isEmpty()) - { - // Remove any temporary queues - - for (SimpleString queueName : tempQueues) - { - if (!initialSession.isClosed()) - { - try - { - initialSession.deleteQueue(queueName); - } - catch (ActiveMQException ignore) - { - // Exception on deleting queue shouldn't prevent close from completing - } - } - } - } - } - finally - { - if (initialSession != null) - { - initialSession.close(); - } - } - - closed = true; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public ConnectionConsumer - createConnectionConsumer(final Destination destination, final String messageSelector, - final ServerSessionPool sessionPool, final int maxMessages) throws JMSException - { - checkClosed(); - - checkTempQueues(destination); - - // We offer a RA, so no need to implement this for MDBs - return null; - } - - private void checkTempQueues(Destination destination) throws JMSException - { - HornetQDestination jbdest = (HornetQDestination) destination; - - if (jbdest.isTemporary() && !containsTemporaryQueue(jbdest.getSimpleAddress())) - { - throw new JMSException("Can not create consumer for temporary destination " + destination + - " from another JMS connection"); - } - } - - public ConnectionConsumer - createDurableConnectionConsumer(final Topic topic, final String subscriptionName, - final String messageSelector, final ServerSessionPool sessionPool, - final int maxMessages) throws JMSException - { - checkClosed(); - // As spec. section 4.11 - if (connectionType == HornetQConnection.TYPE_QUEUE_CONNECTION) - { - String msg = "Cannot create a durable connection consumer on a QueueConnection"; - throw new javax.jms.IllegalStateException(msg); - } - checkTempQueues(topic); - // We offer RA, so no need for this - return null; - } - - @Override - public Session createSession(int sessionMode) throws JMSException - { - checkClosed(); - return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, HornetQSession.TYPE_GENERIC_SESSION); - - } - - @Override - public Session createSession() throws JMSException - { - checkClosed(); - return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, HornetQSession.TYPE_GENERIC_SESSION); - } - - // QueueConnection implementation --------------------------------------------------------------- - - public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException - { - checkClosed(); - return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_QUEUE_SESSION); - } - - /** - * I'm keeping this as static as the same check will be done within RA. - * This is to conform with TCK Tests where we must return ackMode exactly as they want if transacted=false - */ - public static int checkAck(boolean transacted, int acknowledgeMode) - { - if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) - { - return Session.AUTO_ACKNOWLEDGE; - } - - return acknowledgeMode; - } - - public ConnectionConsumer - createConnectionConsumer(final Queue queue, final String messageSelector, - final ServerSessionPool sessionPool, final int maxMessages) throws JMSException - { - checkClosed(); - checkTempQueues(queue); - return null; - } - - // TopicConnection implementation --------------------------------------------------------------- - - public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - checkClosed(); - return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_TOPIC_SESSION); - } - - public ConnectionConsumer - createConnectionConsumer(final Topic topic, final String messageSelector, - final ServerSessionPool sessionPool, final int maxMessages) throws JMSException - { - checkClosed(); - checkTempQueues(topic); - return null; - } - - @Override - public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException - { - return null; // we offer RA - } - - @Override - public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException - { - return null; // we offer RA - } - - // Public --------------------------------------------------------------------------------------- - - /** - * Sets a FailureListener for the session which is notified if a failure occurs on the session. - * - * @param listener the listener to add - * @throws JMSException - */ - public void setFailoverListener(final FailoverEventListener listener) throws JMSException - { - checkClosed(); - - justCreated = false; - - this.failoverEventListener = listener; - - } - - /** - * @return {@link FailoverEventListener} the current failover event listener for this connection - * @throws JMSException - */ - public FailoverEventListener getFailoverListener() throws JMSException - { - checkClosed(); - - justCreated = false; - - return failoverEventListener; - } - - public void addTemporaryQueue(final SimpleString queueAddress) - { - tempQueues.add(queueAddress); - knownDestinations.add(queueAddress); - } - - public void removeTemporaryQueue(final SimpleString queueAddress) - { - tempQueues.remove(queueAddress); - } - - public void addKnownDestination(final SimpleString address) - { - knownDestinations.add(address); - } - - public boolean containsKnownDestination(final SimpleString address) - { - return knownDestinations.contains(address); - } - - public boolean containsTemporaryQueue(final SimpleString queueAddress) - { - return tempQueues.contains(queueAddress); - } - - public boolean hasNoLocal() - { - return hasNoLocal; - } - - public void setHasNoLocal() - { - hasNoLocal = true; - } - - public SimpleString getUID() - { - return uid; - } - - public void removeSession(final HornetQSession session) - { - sessions.remove(session); - } - - public ClientSession getInitialSession() - { - return initialSession; - } - - // Package protected ---------------------------------------------------------------------------- - - // Protected ------------------------------------------------------------------------------------ - - // In case the user forgets to close the connection manually - - @Override - protected final void finalize() throws Throwable - { - if (!closed) - { - HornetQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack); - - close(); - } - } - - protected boolean isXA() - { - return false; - } - - protected final HornetQSession - createSessionInternal(final boolean isXA, final boolean transacted, int acknowledgeMode, final int type) throws JMSException - { - if (transacted) - { - acknowledgeMode = Session.SESSION_TRANSACTED; - } - - try - { - ClientSession session; - - if (acknowledgeMode == Session.SESSION_TRANSACTED) - { - session = - sessionFactory.createSession(username, password, isXA, false, false, - sessionFactory.getServerLocator().isPreAcknowledge(), - transactionBatchSize); - } - else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) - { - session = - sessionFactory.createSession(username, password, isXA, true, true, - sessionFactory.getServerLocator().isPreAcknowledge(), 0); - } - else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) - { - session = - sessionFactory.createSession(username, password, isXA, true, true, - sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize); - } - else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) - { - session = - sessionFactory.createSession(username, password, isXA, true, false, - sessionFactory.getServerLocator().isPreAcknowledge(), - transactionBatchSize); - } - else if (acknowledgeMode == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) - { - session = - sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize); - } - else if (acknowledgeMode == HornetQJMSConstants.PRE_ACKNOWLEDGE) - { - session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize); - } - else - { - throw new JMSRuntimeException("Invalid ackmode: " + acknowledgeMode); - } - - justCreated = false; - - // Setting multiple times on different sessions doesn't matter since RemotingConnection - // maintains - // a set (no duplicates) - session.addFailureListener(listener); - session.addFailoverListener(failoverListener); - - HornetQSession jbs = createHQSession(isXA, transacted, acknowledgeMode, session, type); - - sessions.add(jbs); - - if (started) - { - session.start(); - } - - this.addSessionMetaData(session); - - return jbs; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - // Private -------------------------------------------------------------------------------------- - - /** - * @param transacted - * @param acknowledgeMode - * @param session - * @param type - * @return - */ - protected HornetQSession createHQSession(boolean isXA, boolean transacted, int acknowledgeMode, ClientSession session, int type) - { - if (isXA) - { - return new HornetQXASession(this, transacted, true, acknowledgeMode, session, type); - } - else - { - return new HornetQSession(this, transacted, false, acknowledgeMode, session, type); - } - } - - protected final void checkClosed() throws JMSException - { - if (closed) - { - throw new IllegalStateException("Connection is closed"); - } - } - - public void authorize() throws JMSException - { - try - { - initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0); - - addSessionMetaData(initialSession); - - initialSession.addFailureListener(listener); - initialSession.addFailoverListener(failoverListener); - } - catch (ActiveMQException me) - { - throw JMSExceptionHelper.convertFromHornetQException(me); - } - } - - private void addSessionMetaData(ClientSession session) throws ActiveMQException - { - session.addMetaData("jms-session", ""); - if (clientID != null) - { - session.addMetaData("jms-client-id", clientID); - } - } - - public void setReference(HornetQConnectionFactory factory) - { - this.factoryReference = factory; - } - - public boolean isStarted() - { - return started; - } - - - // Inner classes -------------------------------------------------------------------------------- - - private static class JMSFailureListener implements SessionFailureListener - { - private final WeakReference<HornetQConnection> connectionRef; - - JMSFailureListener(final HornetQConnection connection) - { - connectionRef = new WeakReference<HornetQConnection>(connection); - } - - @Override - public synchronized void connectionFailed(final ActiveMQException me, boolean failedOver) - { - if (me == null) - { - return; - } - - HornetQConnection conn = connectionRef.get(); - - if (conn != null) - { - try - { - final ExceptionListener exceptionListener = conn.getExceptionListener(); - - if (exceptionListener != null) - { - final JMSException je = - new JMSException(me.toString(), failedOver ? EXCEPTION_FAILOVER : EXCEPTION_DISCONNECT); - - je.initCause(me); - - new Thread(new Runnable() - { - public void run() - { - exceptionListener.onException(je); - } - }).start(); - } - } - catch (JMSException e) - { - if (!conn.closed) - { - HornetQJMSClientLogger.LOGGER.errorCallingExcListener(e); - } - } - } - } - - @Override - public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) - { - connectionFailed(me, failedOver); - } - - public void beforeReconnect(final ActiveMQException me) - { - - } - - } - - private static class FailoverEventListenerImpl implements FailoverEventListener - { - private final WeakReference<HornetQConnection> connectionRef; - - FailoverEventListenerImpl(final HornetQConnection connection) - { - connectionRef = new WeakReference<HornetQConnection>(connection); - } - - @Override - public void failoverEvent(final FailoverEventType eventType) - { - HornetQConnection conn = connectionRef.get(); - - if (conn != null) - { - try - { - final FailoverEventListener failoverListener = conn.getFailoverListener(); - - if (failoverListener != null) - { - - new Thread(new Runnable() - { - public void run() - { - failoverListener.failoverEvent(eventType); - } - }).start(); - } - } - catch (JMSException e) - { - if (!conn.closed) - { - HornetQJMSClientLogger.LOGGER.errorCallingFailoverListener(e); - } - } - } - - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionFactory.java deleted file mode 100644 index 503a13a..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionFactory.java +++ /dev/null @@ -1,821 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.JMSSecurityException; -import javax.jms.JMSSecurityRuntimeException; -import javax.jms.QueueConnection; -import javax.jms.TopicConnection; -import javax.jms.XAConnection; -import javax.jms.XAConnectionFactory; -import javax.jms.XAJMSContext; -import javax.jms.XAQueueConnection; -import javax.jms.XATopicConnection; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import java.io.Serializable; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.HornetQClient; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; -import org.apache.activemq.jms.referenceable.ConnectionFactoryObjectFactory; -import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr; - -/** - * HornetQ implementation of a JMS ConnectionFactory. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - */ -public class HornetQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory -{ - private static final long serialVersionUID = -2810634789345348326L; - - private final ServerLocator serverLocator; - - private String clientID; - - private int dupsOKBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; - - private int transactionBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; - - private boolean readOnly; - - public HornetQConnectionFactory() - { - serverLocator = null; - } - - public HornetQConnectionFactory(final ServerLocator serverLocator) - { - this.serverLocator = serverLocator; - - serverLocator.disableFinalizeCheck(); - } - - public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - if (ha) - { - serverLocator = HornetQClient.createServerLocatorWithHA(groupConfiguration); - } - else - { - serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration); - } - - serverLocator.disableFinalizeCheck(); - } - - public HornetQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) - { - if (ha) - { - serverLocator = HornetQClient.createServerLocatorWithHA(initialConnectors); - } - else - { - serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors); - } - - serverLocator.disableFinalizeCheck(); - } - - // ConnectionFactory implementation ------------------------------------------------------------- - - public Connection createConnection() throws JMSException - { - return createConnection(null, null); - } - - public Connection createConnection(final String username, final String password) throws JMSException - { - return createConnectionInternal(username, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION); - } - - @Override - public JMSContext createContext() - { - return createContext(null, null); - } - - @Override - public JMSContext createContext(final int sessionMode) - { - return createContext(null, null, sessionMode); - } - - @Override - public JMSContext createContext(final String userName, final String password) - { - return createContext(userName, password, JMSContext.AUTO_ACKNOWLEDGE); - } - - @Override - public JMSContext createContext(String userName, String password, int sessionMode) - { - validateSessionMode(sessionMode); - try - { - HornetQConnection connection = - createConnectionInternal(userName, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION); - return connection.createContext(sessionMode); - } - catch (JMSSecurityException e) - { - throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - /** - * @param mode - */ - private static void validateSessionMode(int mode) - { - switch (mode) - { - case JMSContext.AUTO_ACKNOWLEDGE: - case JMSContext.CLIENT_ACKNOWLEDGE: - case JMSContext.DUPS_OK_ACKNOWLEDGE: - case JMSContext.SESSION_TRANSACTED: - { - return; - } - default: - throw new JMSRuntimeException("Invalid Session Mode: " + mode); - } - } - - // QueueConnectionFactory implementation -------------------------------------------------------- - - public QueueConnection createQueueConnection() throws JMSException - { - return createQueueConnection(null, null); - } - - public QueueConnection createQueueConnection(final String username, final String password) throws JMSException - { - return createConnectionInternal(username, password, false, HornetQConnection.TYPE_QUEUE_CONNECTION); - } - - // TopicConnectionFactory implementation -------------------------------------------------------- - - public TopicConnection createTopicConnection() throws JMSException - { - return createTopicConnection(null, null); - } - - public TopicConnection createTopicConnection(final String username, final String password) throws JMSException - { - return createConnectionInternal(username, password, false, HornetQConnection.TYPE_TOPIC_CONNECTION); - } - - // XAConnectionFactory implementation ----------------------------------------------------------- - - public XAConnection createXAConnection() throws JMSException - { - return createXAConnection(null, null); - } - - public XAConnection createXAConnection(final String username, final String password) throws JMSException - { - return (XAConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION); - } - - @Override - public XAJMSContext createXAContext() - { - return createXAContext(null, null); - } - - @Override - public XAJMSContext createXAContext(String userName, String password) - { - try - { - HornetQConnection connection = - createConnectionInternal(userName, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION); - return connection.createXAContext(); - } - catch (JMSSecurityException e) - { - throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - // XAQueueConnectionFactory implementation ------------------------------------------------------ - - public XAQueueConnection createXAQueueConnection() throws JMSException - { - return createXAQueueConnection(null, null); - } - - public XAQueueConnection createXAQueueConnection(final String username, final String password) throws JMSException - { - return (XAQueueConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_QUEUE_CONNECTION); - } - - // XATopicConnectionFactory implementation ------------------------------------------------------ - - public XATopicConnection createXATopicConnection() throws JMSException - { - return createXATopicConnection(null, null); - } - - public XATopicConnection createXATopicConnection(final String username, final String password) throws JMSException - { - return (XATopicConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_TOPIC_CONNECTION); - } - - @Override - public Reference getReference() throws NamingException - { - return new Reference(this.getClass().getCanonicalName(), - new SerializableObjectRefAddr("HornetQ-CF", this), - ConnectionFactoryObjectFactory.class.getCanonicalName(), - null); - } - - // Public --------------------------------------------------------------------------------------- - - public boolean isHA() - { - return serverLocator.isHA(); - } - - public synchronized String getConnectionLoadBalancingPolicyClassName() - { - return serverLocator.getConnectionLoadBalancingPolicyClassName(); - } - - public synchronized void setConnectionLoadBalancingPolicyClassName(final String connectionLoadBalancingPolicyClassName) - { - checkWrite(); - serverLocator.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName); - } - - public synchronized TransportConfiguration[] getStaticConnectors() - { - return serverLocator.getStaticTransportConfigurations(); - } - - public synchronized DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() - { - return serverLocator.getDiscoveryGroupConfiguration(); - } - - public synchronized String getClientID() - { - return clientID; - } - - public synchronized void setClientID(final String clientID) - { - checkWrite(); - this.clientID = clientID; - } - - public synchronized int getDupsOKBatchSize() - { - return dupsOKBatchSize; - } - - public synchronized void setDupsOKBatchSize(final int dupsOKBatchSize) - { - checkWrite(); - this.dupsOKBatchSize = dupsOKBatchSize; - } - - public synchronized int getTransactionBatchSize() - { - return transactionBatchSize; - } - - public synchronized void setTransactionBatchSize(final int transactionBatchSize) - { - checkWrite(); - this.transactionBatchSize = transactionBatchSize; - } - - public synchronized long getClientFailureCheckPeriod() - { - return serverLocator.getClientFailureCheckPeriod(); - } - - public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod) - { - checkWrite(); - serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod); - } - - public synchronized long getConnectionTTL() - { - return serverLocator.getConnectionTTL(); - } - - public synchronized void setConnectionTTL(final long connectionTTL) - { - checkWrite(); - serverLocator.setConnectionTTL(connectionTTL); - } - - public synchronized long getCallTimeout() - { - return serverLocator.getCallTimeout(); - } - - public synchronized void setCallTimeout(final long callTimeout) - { - checkWrite(); - serverLocator.setCallTimeout(callTimeout); - } - - public synchronized long getCallFailoverTimeout() - { - return serverLocator.getCallFailoverTimeout(); - } - - public synchronized void setCallFailoverTimeout(final long callTimeout) - { - checkWrite(); - serverLocator.setCallFailoverTimeout(callTimeout); - } - - public synchronized int getConsumerWindowSize() - { - return serverLocator.getConsumerWindowSize(); - } - - public synchronized void setConsumerWindowSize(final int consumerWindowSize) - { - checkWrite(); - serverLocator.setConsumerWindowSize(consumerWindowSize); - } - - public synchronized int getConsumerMaxRate() - { - return serverLocator.getConsumerMaxRate(); - } - - public synchronized void setConsumerMaxRate(final int consumerMaxRate) - { - checkWrite(); - serverLocator.setConsumerMaxRate(consumerMaxRate); - } - - public synchronized int getConfirmationWindowSize() - { - return serverLocator.getConfirmationWindowSize(); - } - - public synchronized void setConfirmationWindowSize(final int confirmationWindowSize) - { - checkWrite(); - serverLocator.setConfirmationWindowSize(confirmationWindowSize); - } - - public synchronized int getProducerMaxRate() - { - return serverLocator.getProducerMaxRate(); - } - - public synchronized void setProducerMaxRate(final int producerMaxRate) - { - checkWrite(); - serverLocator.setProducerMaxRate(producerMaxRate); - } - - public synchronized int getProducerWindowSize() - { - return serverLocator.getProducerWindowSize(); - } - - public synchronized void setProducerWindowSize(final int producerWindowSize) - { - checkWrite(); - serverLocator.setProducerWindowSize(producerWindowSize); - } - - /** - * @param cacheLargeMessagesClient - */ - public synchronized void setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) - { - checkWrite(); - serverLocator.setCacheLargeMessagesClient(cacheLargeMessagesClient); - } - - public synchronized boolean isCacheLargeMessagesClient() - { - return serverLocator.isCacheLargeMessagesClient(); - } - - public synchronized int getMinLargeMessageSize() - { - return serverLocator.getMinLargeMessageSize(); - } - - public synchronized void setMinLargeMessageSize(final int minLargeMessageSize) - { - checkWrite(); - serverLocator.setMinLargeMessageSize(minLargeMessageSize); - } - - public synchronized boolean isBlockOnAcknowledge() - { - return serverLocator.isBlockOnAcknowledge(); - } - - public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge) - { - checkWrite(); - serverLocator.setBlockOnAcknowledge(blockOnAcknowledge); - } - - public synchronized boolean isBlockOnNonDurableSend() - { - return serverLocator.isBlockOnNonDurableSend(); - } - - public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) - { - checkWrite(); - serverLocator.setBlockOnNonDurableSend(blockOnNonDurableSend); - } - - public synchronized boolean isBlockOnDurableSend() - { - return serverLocator.isBlockOnDurableSend(); - } - - public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend) - { - checkWrite(); - serverLocator.setBlockOnDurableSend(blockOnDurableSend); - } - - public synchronized boolean isAutoGroup() - { - return serverLocator.isAutoGroup(); - } - - public synchronized void setAutoGroup(final boolean autoGroup) - { - checkWrite(); - serverLocator.setAutoGroup(autoGroup); - } - - public synchronized boolean isPreAcknowledge() - { - return serverLocator.isPreAcknowledge(); - } - - public synchronized void setPreAcknowledge(final boolean preAcknowledge) - { - checkWrite(); - serverLocator.setPreAcknowledge(preAcknowledge); - } - - public synchronized long getRetryInterval() - { - return serverLocator.getRetryInterval(); - } - - public synchronized void setRetryInterval(final long retryInterval) - { - checkWrite(); - serverLocator.setRetryInterval(retryInterval); - } - - public synchronized long getMaxRetryInterval() - { - return serverLocator.getMaxRetryInterval(); - } - - public synchronized void setMaxRetryInterval(final long retryInterval) - { - checkWrite(); - serverLocator.setMaxRetryInterval(retryInterval); - } - - public synchronized double getRetryIntervalMultiplier() - { - return serverLocator.getRetryIntervalMultiplier(); - } - - public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier) - { - checkWrite(); - serverLocator.setRetryIntervalMultiplier(retryIntervalMultiplier); - } - - public synchronized int getReconnectAttempts() - { - return serverLocator.getReconnectAttempts(); - } - - public synchronized void setReconnectAttempts(final int reconnectAttempts) - { - checkWrite(); - serverLocator.setReconnectAttempts(reconnectAttempts); - } - - public synchronized void setInitialConnectAttempts(final int reconnectAttempts) - { - checkWrite(); - serverLocator.setInitialConnectAttempts(reconnectAttempts); - } - - public synchronized int getInitialConnectAttempts() - { - checkWrite(); - return serverLocator.getInitialConnectAttempts(); - } - - public synchronized boolean isFailoverOnInitialConnection() - { - return serverLocator.isFailoverOnInitialConnection(); - } - - public synchronized void setFailoverOnInitialConnection(final boolean failover) - { - checkWrite(); - serverLocator.setFailoverOnInitialConnection(failover); - } - - public synchronized boolean isUseGlobalPools() - { - return serverLocator.isUseGlobalPools(); - } - - public synchronized void setUseGlobalPools(final boolean useGlobalPools) - { - checkWrite(); - serverLocator.setUseGlobalPools(useGlobalPools); - } - - public synchronized int getScheduledThreadPoolMaxSize() - { - return serverLocator.getScheduledThreadPoolMaxSize(); - } - - public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) - { - checkWrite(); - serverLocator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - } - - public synchronized int getThreadPoolMaxSize() - { - return serverLocator.getThreadPoolMaxSize(); - } - - public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize) - { - checkWrite(); - serverLocator.setThreadPoolMaxSize(threadPoolMaxSize); - } - - public synchronized int getInitialMessagePacketSize() - { - return serverLocator.getInitialMessagePacketSize(); - } - - public synchronized void setInitialMessagePacketSize(final int size) - { - checkWrite(); - serverLocator.setInitialMessagePacketSize(size); - } - - public void setGroupID(final String groupID) - { - serverLocator.setGroupID(groupID); - } - - public String getGroupID() - { - return serverLocator.getGroupID(); - } - - public boolean isCompressLargeMessage() - { - return serverLocator.isCompressLargeMessage(); - } - - public void setCompressLargeMessage(boolean avoidLargeMessages) - { - serverLocator.setCompressLargeMessage(avoidLargeMessages); - } - - public void close() - { - ServerLocator locator0 = serverLocator; - if (locator0 != null) - locator0.close(); - } - - public ServerLocator getServerLocator() - { - return serverLocator; - } - - public int getFactoryType() - { - return JMSFactoryType.CF.intValue(); - } - - // Package protected ---------------------------------------------------------------------------- - - // Protected ------------------------------------------------------------------------------------ - - protected synchronized HornetQConnection createConnectionInternal(final String username, - final String password, - final boolean isXA, - final int type) throws JMSException - { - readOnly = true; - - ClientSessionFactory factory; - - try - { - factory = serverLocator.createSessionFactory(); - } - catch (Exception e) - { - JMSException jmse = new JMSException("Failed to create session factory"); - - jmse.initCause(e); - jmse.setLinkedException(e); - - throw jmse; - } - - HornetQConnection connection = null; - - if (isXA) - { - if (type == HornetQConnection.TYPE_GENERIC_CONNECTION) - { - connection = new HornetQXAConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION) - { - connection = - new HornetQXAConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION) - { - connection = - new HornetQXAConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - } - else - { - if (type == HornetQConnection.TYPE_GENERIC_CONNECTION) - { - connection = new HornetQConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION) - { - connection = - new HornetQConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION) - { - connection = - new HornetQConnection(username, - password, - type, - clientID, - dupsOKBatchSize, - transactionBatchSize, - factory); - } - } - - if (connection == null) - { - throw new JMSException("Failed to create connection: invalid type " + type); - } - connection.setReference(this); - - try - { - connection.authorize(); - } - catch (JMSException e) - { - try - { - connection.close(); - } - catch (JMSException me) - { - } - throw e; - } - - return connection; - } - - @Override - public String toString() - { - return "HornetQConnectionFactory [serverLocator=" + serverLocator + - ", clientID=" + - clientID + - ", consumerWindowSize = " + - getConsumerWindowSize() + - ", dupsOKBatchSize=" + - dupsOKBatchSize + - ", transactionBatchSize=" + - transactionBatchSize + - ", readOnly=" + - readOnly + - "]"; - } - - - // Private -------------------------------------------------------------------------------------- - - private void checkWrite() - { - if (readOnly) - { - throw new IllegalStateException("Cannot set attribute on HornetQConnectionFactory after it has been used"); - } - } - - @Override - protected void finalize() throws Throwable - { - try - { - serverLocator.close(); - } - catch (Exception e) - { - e.printStackTrace(); - //not much we can do here - } - super.finalize(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContext.java deleted file mode 100644 index 7e493a1..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContext.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -/** - * - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSContext; -import javax.jms.XAJMSContext; - -/** - * Interface created to support reference counting all contexts using it. - * <p> - * Necessary to support {@code JMSContext.close()} conditions. - * @see JMSContext - */ -public interface HornetQConnectionForContext extends javax.jms.Connection -{ - JMSContext createContext(int sessionMode); - - XAJMSContext createXAContext(); - - void closeFromContext(); -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContextImpl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContextImpl.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContextImpl.java deleted file mode 100644 index 25e3b3a..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionForContextImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -/** - * - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Session; -import javax.jms.XAJMSContext; - -import org.apache.activemq.api.jms.HornetQJMSConstants; -import org.apache.activemq.utils.ReferenceCounter; -import org.apache.activemq.utils.ReferenceCounterUtil; - -public abstract class HornetQConnectionForContextImpl implements HornetQConnectionForContext -{ - - final Runnable closeRunnable = new Runnable() - { - public void run() - { - try - { - close(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - }; - - final ReferenceCounter refCounter = new ReferenceCounterUtil(closeRunnable); - - protected final ThreadAwareContext threadAwareContext = new ThreadAwareContext(); - - public JMSContext createContext(int sessionMode) - { - switch (sessionMode) - { - case Session.AUTO_ACKNOWLEDGE: - case Session.CLIENT_ACKNOWLEDGE: - case Session.DUPS_OK_ACKNOWLEDGE: - case Session.SESSION_TRANSACTED: - case HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: - case HornetQJMSConstants.PRE_ACKNOWLEDGE: - break; - default: - throw new JMSRuntimeException("Invalid ackmode: " + sessionMode); - } - refCounter.increment(); - - return new HornetQJMSContext(this, sessionMode, threadAwareContext); - } - - public XAJMSContext createXAContext() - { - refCounter.increment(); - - return new HornetQXAJMSContext(this, threadAwareContext); - } - - @Override - public void closeFromContext() - { - refCounter.decrement(); - } - - protected void incrementRefCounter() - { - refCounter.increment(); - } - - public ThreadAwareContext getThreadAwareContext() - { - return threadAwareContext; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionMetaData.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionMetaData.java deleted file mode 100644 index 8109b6d..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQConnectionMetaData.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import java.util.Enumeration; -import java.util.Vector; - -import javax.jms.ConnectionMetaData; -import javax.jms.JMSException; - -import org.apache.activemq.core.version.Version; - -/** - * HornetQ implementation of a JMS ConnectionMetaData. - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * - */ -public class HornetQConnectionMetaData implements ConnectionMetaData -{ - // Constants ----------------------------------------------------- - - private static final String HORNETQ = "HornetQ"; - - // Static -------------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private final Version serverVersion; - - // Constructors -------------------------------------------------- - - /** - * Create a new HornetQConnectionMetaData object. - */ - public HornetQConnectionMetaData(final Version serverVersion) - { - this.serverVersion = serverVersion; - } - - // ConnectionMetaData implementation ----------------------------- - - public String getJMSVersion() throws JMSException - { - return "2.0"; - } - - public int getJMSMajorVersion() throws JMSException - { - return 2; - } - - public int getJMSMinorVersion() throws JMSException - { - return 0; - } - - public String getJMSProviderName() throws JMSException - { - return HornetQConnectionMetaData.HORNETQ; - } - - public String getProviderVersion() throws JMSException - { - return serverVersion.getFullVersion(); - } - - public int getProviderMajorVersion() throws JMSException - { - return serverVersion.getMajorVersion(); - } - - public int getProviderMinorVersion() throws JMSException - { - return serverVersion.getMinorVersion(); - } - - public Enumeration getJMSXPropertyNames() throws JMSException - { - Vector<Object> v = new Vector<Object>(); - v.add("JMSXGroupID"); - v.add("JMSXGroupSeq"); - v.add("JMSXDeliveryCount"); - return v.elements(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQDestination.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQDestination.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQDestination.java deleted file mode 100644 index c9766d0..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQDestination.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import java.io.Serializable; -import java.util.UUID; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; - -import org.apache.activemq.api.core.Pair; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.jms.referenceable.DestinationObjectFactory; -import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr; - -/** - * HornetQ implementation of a JMS Destination. - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * - */ -public class HornetQDestination implements Destination, Serializable, Referenceable -{ - // Constants ----------------------------------------------------- - - // Static -------------------------------------------------------- - - /** - * - */ - private static final long serialVersionUID = 5027962425462382883L; - - public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue."; - - public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue."; - - public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic."; - - public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic."; - - private static final char SEPARATOR = '.'; - - private static String escape(final String input) - { - if (input == null) - { - return ""; - } - return input.replace("\\", "\\\\").replace(".", "\\."); - } - - public static Destination fromAddress(final String address) - { - if (address.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX)) - { - String name = address.substring(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()); - - return createQueue(name); - } - else if (address.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX)) - { - String name = address.substring(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()); - - return createTopic(name); - } - else if (address.startsWith(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX)) - { - String name = address.substring(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length()); - - return new HornetQTemporaryQueue(address, name, null); - } - else if (address.startsWith(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX)) - { - String name = address.substring(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length()); - - return new HornetQTemporaryTopic(address, name, null); - } - else - { - throw new JMSRuntimeException("Invalid address " + address); - } - } - - public static String createQueueNameForDurableSubscription(final boolean isDurable, final String clientID, final String subscriptionName) - { - if (clientID != null) - { - if (isDurable) - { - return HornetQDestination.escape(clientID) + SEPARATOR + - HornetQDestination.escape(subscriptionName); - } - else - { - return "nonDurable" + SEPARATOR + - HornetQDestination.escape(clientID) + SEPARATOR + - HornetQDestination.escape(subscriptionName); - } - } - else - { - if (isDurable) - { - return HornetQDestination.escape(subscriptionName); - } - else - { - return "nonDurable" + SEPARATOR + - HornetQDestination.escape(subscriptionName); - } - } - } - - public static String createQueueNameForSharedSubscription(final boolean isDurable, final String clientID, final String subscriptionName) - { - if (clientID != null) - { - return (isDurable ? "Durable" : "nonDurable") + SEPARATOR + - HornetQDestination.escape(clientID) + SEPARATOR + - HornetQDestination.escape(subscriptionName); - } - else - { - return (isDurable ? "Durable" : "nonDurable") + SEPARATOR + - HornetQDestination.escape(subscriptionName); - } - } - - public static Pair<String, String> decomposeQueueNameForDurableSubscription(final String queueName) - { - StringBuffer[] parts = new StringBuffer[2]; - int currentPart = 0; - - parts[0] = new StringBuffer(); - parts[1] = new StringBuffer(); - - int pos = 0; - while (pos < queueName.length()) - { - char ch = queueName.charAt(pos); - pos++; - - if (ch == SEPARATOR) - { - currentPart++; - if (currentPart >= parts.length) - { - throw new JMSRuntimeException("Invalid message queue name: " + queueName); - } - - continue; - } - - if (ch == '\\') - { - if (pos >= queueName.length()) - { - throw new JMSRuntimeException("Invalid message queue name: " + queueName); - } - ch = queueName.charAt(pos); - pos++; - } - - parts[currentPart].append(ch); - } - - if (currentPart != 1) - { - throw new JMSRuntimeException("Invalid message queue name: " + queueName); - } - - Pair<String, String> pair = new Pair<String, String>(parts[0].toString(), parts[1].toString()); - - return pair; - } - - public static SimpleString createQueueAddressFromName(final String name) - { - return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + name); - } - - public static SimpleString createTopicAddressFromName(final String name) - { - return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + name); - } - - public static HornetQQueue createQueue(final String name) - { - return new HornetQQueue(name); - } - - public static HornetQTopic createTopic(final String name) - { - return new HornetQTopic(name); - } - - public static HornetQTemporaryQueue createTemporaryQueue(final String name, final HornetQSession session) - { - return new HornetQTemporaryQueue(JMS_TEMP_QUEUE_ADDRESS_PREFIX.concat(name), name, session); - } - - public static HornetQTemporaryQueue createTemporaryQueue(final String name) - { - return createTemporaryQueue(name, null); - } - - public static HornetQTemporaryQueue createTemporaryQueue(final HornetQSession session) - { - String name = UUID.randomUUID().toString(); - - return createTemporaryQueue(name, session); - } - - public static HornetQTemporaryTopic createTemporaryTopic(final HornetQSession session) - { - String name = UUID.randomUUID().toString(); - - return createTemporaryTopic(name, session); - } - - public static HornetQTemporaryTopic createTemporaryTopic(String name, final HornetQSession session) - { - return new HornetQTemporaryTopic(JMS_TEMP_TOPIC_ADDRESS_PREFIX.concat(name), name, session); - } - - public static HornetQTemporaryTopic createTemporaryTopic(String name) - { - return createTemporaryTopic(name, null); - } - - // Attributes ---------------------------------------------------- - - /** - * The JMS name - */ - protected final String name; - - /** - * The core address - */ - private final String address; - - /** - * SimpleString version of address - */ - private final SimpleString simpleAddress; - - private final boolean temporary; - - private final boolean queue; - - private final transient HornetQSession session; - - // Constructors -------------------------------------------------- - - protected HornetQDestination(final String address, final String name, - final boolean temporary, - final boolean queue, - final HornetQSession session) - { - this.address = address; - - this.name = name; - - simpleAddress = new SimpleString(address); - - this.temporary = temporary; - - this.queue = queue; - - this.session = session; - } - - // Referenceable implementation --------------------------------------- - - public Reference getReference() throws NamingException - { - return new Reference(this.getClass().getCanonicalName(), - new SerializableObjectRefAddr("HornetQ-DEST", this), - DestinationObjectFactory.class.getCanonicalName(), - null); - } - - public void delete() throws JMSException - { - if (session != null) - { - if (session.getCoreSession().isClosed()) - { - // Temporary queues will be deleted when the connection is closed.. nothing to be done then! - return; - } - if (queue) - { - session.deleteTemporaryQueue(this); - } - else - { - session.deleteTemporaryTopic(this); - } - } - } - - public boolean isQueue() - { - return queue; - } - - // Public -------------------------------------------------------- - - public String getAddress() - { - return address; - } - - public SimpleString getSimpleAddress() - { - return simpleAddress; - } - - public String getName() - { - return name; - } - - public boolean isTemporary() - { - return temporary; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) - { - return true; - } - - if (!(o instanceof HornetQDestination)) - { - return false; - } - - HornetQDestination that = (HornetQDestination)o; - - return address.equals(that.address); - } - - @Override - public int hashCode() - { - return address.hashCode(); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -}