Repository: camel Updated Branches: refs/heads/master 84812de96 -> eb0e437ec
CAMEL-7734: Replace custom pool implementation by commons-pool Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88809ef3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88809ef3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88809ef3 Branch: refs/heads/master Commit: 88809ef3d08a8752572ae2302e90499c995121e6 Parents: 8142b43 Author: Cristiano Nicolai <cristiano.nico...@gmail.com> Authored: Thu Aug 21 01:25:54 2014 +1000 Committer: Cristiano Nicolai <cristiano.nico...@gmail.com> Committed: Fri Aug 22 11:18:20 2014 +1000 ---------------------------------------------------------------------- components/camel-sjms/pom.xml | 4 + .../sjms/MessageConsumerResources.java | 56 +++++ .../sjms/MessageProducerResources.java | 55 +++++ .../camel/component/sjms/SjmsConsumer.java | 112 ++++------ .../component/sjms/SjmsMessageConsumer.java | 38 ---- .../camel/component/sjms/SjmsProducer.java | 101 +++------ .../sjms/jms/ConnectionFactoryResource.java | 59 +++-- .../component/sjms/jms/ConnectionResource.java | 10 - .../camel/component/sjms/jms/ObjectPool.java | 148 ------------- .../camel/component/sjms/jms/SessionPool.java | 170 --------------- .../component/sjms/producer/InOnlyProducer.java | 1 + .../component/sjms/producer/InOutProducer.java | 81 ++----- .../component/sjms/it/ConnectionResourceIT.java | 24 +-- .../sjms/jms/ConnectionFactoryResourceTest.java | 36 ++-- .../component/sjms/jms/ObjectPoolTest.java | 216 ------------------- .../component/sjms/jms/SessionPoolTest.java | 118 ---------- 16 files changed, 274 insertions(+), 955 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml index 1b46386..e6e45d0 100644 --- a/components/camel-sjms/pom.xml +++ b/components/camel-sjms/pom.xml @@ -59,6 +59,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + </dependency> <dependency> <groupId>org.apache.geronimo.specs</groupId> http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageConsumerResources.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageConsumerResources.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageConsumerResources.java new file mode 100644 index 0000000..945b133 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageConsumerResources.java @@ -0,0 +1,56 @@ +package org.apache.camel.component.sjms; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +public class MessageConsumerResources { + + private final Session session; + private final MessageConsumer messageConsumer; + private final Destination replyToDestination; + + public MessageConsumerResources(MessageConsumer messageConsumer) { + this(null, messageConsumer, null); + } + + public MessageConsumerResources(Session session, MessageConsumer messageConsumer) { + this(session, messageConsumer, null); + } + + /** + * TODO Add Constructor Javadoc + * + * @param session + * @param messageConsumer + */ + public MessageConsumerResources(Session session, MessageConsumer messageConsumer, Destination replyToDestination) { + this.session = session; + this.messageConsumer = messageConsumer; + this.replyToDestination = replyToDestination; + } + + /** + * Gets the Session value of session for this instance of + * MessageProducerModel. + * + * @return the session + */ + public Session getSession() { + return session; + } + + /** + * Gets the QueueSender value of queueSender for this instance of + * MessageProducerModel. + * + * @return the queueSender + */ + public MessageConsumer getMessageConsumer() { + return messageConsumer; + } + + public Destination getReplyToDestination() { + return replyToDestination; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerResources.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerResources.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerResources.java new file mode 100644 index 0000000..3bdad29 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerResources.java @@ -0,0 +1,55 @@ +package org.apache.camel.component.sjms; + +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * The {@link MessageProducer} resources for all {@link SjmsProducer} + * classes. + */ +public class MessageProducerResources { + + private final Session session; + private final MessageProducer messageProducer; + private final TransactionCommitStrategy commitStrategy; + + public MessageProducerResources(Session session, MessageProducer messageProducer) { + this(session, messageProducer, null); + } + + public MessageProducerResources(Session session, MessageProducer messageProducer, TransactionCommitStrategy commitStrategy) { + this.session = session; + this.messageProducer = messageProducer; + this.commitStrategy = commitStrategy; + } + + /** + * Gets the Session value of session for this instance of + * MessageProducerResources. + * + * @return the session + */ + public Session getSession() { + return session; + } + + /** + * Gets the QueueSender value of queueSender for this instance of + * MessageProducerResources. + * + * @return the queueSender + */ + public MessageProducer getMessageProducer() { + return messageProducer; + } + + /** + * Gets the TransactionCommitStrategy value of commitStrategy for this + * instance of SjmsProducer.MessageProducerResources. + * + * @return the commitStrategy + */ + public TransactionCommitStrategy getCommitStrategy() { + return commitStrategy; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index c6070d6..2e585ee 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.sjms; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import javax.jms.Connection; import javax.jms.MessageConsumer; @@ -32,7 +33,6 @@ import org.apache.camel.component.sjms.consumer.InOnlyMessageHandler; import org.apache.camel.component.sjms.consumer.InOutMessageHandler; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.JmsObjectFactory; -import org.apache.camel.component.sjms.jms.ObjectPool; import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy; @@ -40,6 +40,8 @@ import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.spi.Synchronization; +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; /** * The SjmsConsumer is the base class for the SJMS MessageListener pool. @@ -47,36 +49,32 @@ import org.apache.camel.spi.Synchronization; */ public class SjmsConsumer extends DefaultConsumer { - protected MessageConsumerPool consumers; + protected GenericObjectPool<MessageConsumerResources> consumers; private ExecutorService executor; + private Future<?> asyncStart; /** * A pool of MessageConsumerResources created at the initialization of the associated consumer. */ - protected class MessageConsumerPool extends ObjectPool<MessageConsumerResources> { - - public MessageConsumerPool() { - super(getConsumerCount()); - } + protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> { /** * Creates a new MessageConsumerResources instance. * - * @see org.apache.camel.component.sjms.jms.ObjectPool#createObject() + * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() */ @Override - protected MessageConsumerResources createObject() throws Exception { - MessageConsumerResources model = createConsumer(); - return model; + public MessageConsumerResources makeObject() throws Exception { + return createConsumer(); } /** * Cleans up the MessageConsumerResources. * - * @see org.apache.camel.component.sjms.jms.ObjectPool#destroyObject(java.lang.Object) + * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ @Override - protected void destroyObject(MessageConsumerResources model) throws Exception { + public void destroyObject(MessageConsumerResources model) throws Exception { if (model != null) { // First clean up our message consumer if (model.getMessageConsumer() != null) { @@ -98,41 +96,6 @@ public class SjmsConsumer extends DefaultConsumer { } } - protected class MessageConsumerResources { - private final Session session; - private final MessageConsumer messageConsumer; - - public MessageConsumerResources(MessageConsumer messageConsumer) { - this.session = null; - this.messageConsumer = messageConsumer; - } - - public MessageConsumerResources(Session session, MessageConsumer messageConsumer) { - this.session = session; - this.messageConsumer = messageConsumer; - } - - /** - * Gets the Session value of session for this instance of - * MessageProducerModel. - * - * @return the session - */ - public Session getSession() { - return session; - } - - /** - * Gets the QueueSender value of queueSender for this instance of - * MessageProducerModel. - * - * @return the queueSender - */ - public MessageConsumer getMessageConsumer() { - return messageConsumer; - } - } - public SjmsConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -146,38 +109,51 @@ public class SjmsConsumer extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer"); - consumers = new MessageConsumerPool(); - if (getEndpoint().isAsyncStartListener()) { - getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { - @Override - public void run() { - try { - consumers.fillPool(); - } catch (Throwable e) { - log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + if(consumers == null){ + consumers = new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()); + consumers.setMaxActive(getConsumerCount()); + consumers.setMaxIdle(getConsumerCount()); + if(getEndpoint().isAsyncStartListener()){ + asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + fillConsumersPool(); + } catch (Throwable e) { + log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } } - } - @Override - public String toString() { - return "AsyncStartListenerTask[" + getDestinationName() + "]"; - } - }); - } else { - consumers.fillPool(); + @Override + public String toString() { + return "AsyncStartListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + fillConsumersPool(); + } + } + } + + private void fillConsumersPool() throws Exception { + while(consumers.getNumIdle() < consumers.getMaxIdle()){ + consumers.addObject(); } } @Override protected void doStop() throws Exception { super.doStop(); - if (consumers != null) { + if(asyncStart != null && asyncStart.isDone() == false){ + asyncStart.cancel(true); + } + if(consumers != null){ if (getEndpoint().isAsyncStopListener()) { getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { @Override public void run() { try { - consumers.drainPool(); + consumers.close(); consumers = null; } catch (Throwable e) { log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); @@ -190,7 +166,7 @@ public class SjmsConsumer extends DefaultConsumer { } }); } else { - consumers.drainPool(); + consumers.close(); consumers = null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java deleted file mode 100644 index 8961b06..0000000 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms; - -import java.util.concurrent.Exchanger; - -import javax.jms.Message; -import javax.jms.MessageListener; - -import org.apache.camel.component.sjms.jms.ConnectionResource; -import org.apache.camel.component.sjms.jms.SessionPool; - -/** - * TODO Add Class documentation for SjmsMessageConsumer - */ -public interface SjmsMessageConsumer extends MessageListener { - void handleMessage(Message message); - - SjmsMessageConsumer createMessageConsumer(ConnectionResource connectionResource, String destinationName) throws Exception; - - SjmsMessageConsumer createMessageConsumerListener(SessionPool sessionPool, String destinationName, Exchanger<Object> exchanger) throws Exception; - - void destroyMessageConsumer() throws Exception; -} http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index f7474d0..3d2b706 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -17,17 +17,16 @@ package org.apache.camel.component.sjms; import java.util.concurrent.ExecutorService; - -import javax.jms.MessageProducer; -import javax.jms.Session; +import java.util.concurrent.Future; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.jms.ConnectionResource; -import org.apache.camel.component.sjms.jms.ObjectPool; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.ObjectHelper; +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; /** * Base SjmsProducer class. @@ -38,19 +37,15 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { * The {@link MessageProducerResources} pool for all {@link SjmsProducer} * classes. */ - protected class MessageProducerPool extends ObjectPool<MessageProducerResources> { - - public MessageProducerPool() { - super(getProducerCount()); - } + protected class MessageProducerResourcesFactory extends BasePoolableObjectFactory<MessageProducerResources> { @Override - protected MessageProducerResources createObject() throws Exception { + public MessageProducerResources makeObject() throws Exception { return doCreateProducerModel(); } @Override - protected void destroyObject(MessageProducerResources model) throws Exception { + public void destroyObject(MessageProducerResources model) throws Exception { if (model.getMessageProducer() != null) { model.getMessageProducer().close(); } @@ -72,58 +67,9 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } } - /** - * The {@link MessageProducer} resources for all {@link SjmsProducer} - * classes. - */ - protected class MessageProducerResources { - private final Session session; - private final MessageProducer messageProducer; - private final TransactionCommitStrategy commitStrategy; - - public MessageProducerResources(Session session, MessageProducer messageProducer) { - this(session, messageProducer, null); - } - - public MessageProducerResources(Session session, MessageProducer messageProducer, TransactionCommitStrategy commitStrategy) { - this.session = session; - this.messageProducer = messageProducer; - this.commitStrategy = commitStrategy; - } - - /** - * Gets the Session value of session for this instance of - * MessageProducerResources. - * - * @return the session - */ - public Session getSession() { - return session; - } - - /** - * Gets the QueueSender value of queueSender for this instance of - * MessageProducerResources. - * - * @return the queueSender - */ - public MessageProducer getMessageProducer() { - return messageProducer; - } - - /** - * Gets the TransactionCommitStrategy value of commitStrategy for this - * instance of SjmsProducer.MessageProducerResources. - * - * @return the commitStrategy - */ - public TransactionCommitStrategy getCommitStrategy() { - return commitStrategy; - } - } - - private MessageProducerPool producers; + private GenericObjectPool<MessageProducerResources> producers; private ExecutorService executor; + private Future<?> asyncStart; public SjmsProducer(Endpoint endpoint) { super(endpoint); @@ -133,14 +79,16 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { protected void doStart() throws Exception { super.doStart(); this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer"); - if (getProducers() == null) { - setProducers(new MessageProducerPool()); - if (getEndpoint().isAsyncStartListener()) { - getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + if(getProducers() == null){ + setProducers(new GenericObjectPool<MessageProducerResources>(new MessageProducerResourcesFactory())); + getProducers().setMaxActive(getProducerCount()); + getProducers().setMaxIdle(getProducerCount()); + if(getEndpoint().isAsyncStartListener()){ + asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { @Override public void run() { try { - getProducers().fillPool(); + fillProducersPool(); } catch (Throwable e) { log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); } @@ -152,21 +100,30 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } }); } else { - getProducers().fillPool(); + fillProducersPool(); } } } + private void fillProducersPool() throws Exception { + while(producers.getNumIdle() < producers.getMaxIdle()){ + producers.addObject(); + } + } + @Override protected void doStop() throws Exception { super.doStop(); + if(asyncStart != null && asyncStart.isDone() == false){ + asyncStart.cancel(true); + } if (getProducers() != null) { if (getEndpoint().isAsyncStopListener()) { getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { @Override public void run() { try { - getProducers().drainPool(); + getProducers().close(); setProducers(null); } catch (Throwable e) { log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); @@ -179,7 +136,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } }); } else { - getProducers().drainPool(); + getProducers().close(); setProducers(null); } } @@ -292,7 +249,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { * * @param producers A MessageProducerPool */ - public void setProducers(MessageProducerPool producers) { + public void setProducers(GenericObjectPool<MessageProducerResources> producers) { this.producers = producers; } @@ -302,7 +259,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { * * @return the producers */ - public MessageProducerPool getProducers() { + public GenericObjectPool<MessageProducerResources> getProducers() { return producers; } http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java index baaa5ef..7246d2f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java @@ -20,11 +20,17 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import org.apache.camel.util.ObjectHelper; +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; /** * The default {@link ConnectionResource} implementation for the SJMSComponent. */ -public class ConnectionFactoryResource extends ObjectPool<Connection> implements ConnectionResource { +public class ConnectionFactoryResource extends BasePoolableObjectFactory<Connection> implements ConnectionResource { + + private static final long DEFAULT_WAIT_TIMEOUT = 5 * 1000; + private static final int DEFAULT_POOL_SIZE = 1; + private GenericObjectPool<Connection> connections; private ConnectionFactory connectionFactory; private String username; private String password; @@ -34,6 +40,7 @@ public class ConnectionFactoryResource extends ObjectPool<Connection> implements * Default Constructor */ public ConnectionFactoryResource() { + this(DEFAULT_POOL_SIZE, null); } /** @@ -53,10 +60,7 @@ public class ConnectionFactoryResource extends ObjectPool<Connection> implements * @param password */ public ConnectionFactoryResource(int poolSize, ConnectionFactory connectionFactory, String username, String password) { - super(poolSize); - this.connectionFactory = connectionFactory; - this.username = username; - this.password = password; + this(poolSize, connectionFactory, username, password, null); } /** @@ -64,32 +68,43 @@ public class ConnectionFactoryResource extends ObjectPool<Connection> implements * @param connectionFactory * @param username * @param password + * @param connectionId */ public ConnectionFactoryResource(int poolSize, ConnectionFactory connectionFactory, String username, String password, String connectionId) { - super(poolSize); + this(poolSize, connectionFactory, username, password, null, DEFAULT_WAIT_TIMEOUT); + } + + /** + * @param poolSize + * @param connectionFactory + * @param username + * @param password + * @param connectionId + * @param maxWait + */ + public ConnectionFactoryResource(int poolSize, ConnectionFactory connectionFactory, String username, String password, String connectionId, long maxWait) { this.connectionFactory = connectionFactory; this.username = username; this.password = password; this.clientId = connectionId; + this.connections = new GenericObjectPool<Connection>(this); + this.connections.setMaxWait(maxWait); + this.connections.setMaxActive(poolSize); + this.connections.setMaxIdle(poolSize); } @Override public Connection borrowConnection() throws Exception { - return borrowObject(); - } - - @Override - public Connection borrowConnection(long timeout) throws Exception { - return borrowObject(timeout); + return connections.borrowObject(); } @Override public void returnConnection(Connection connection) throws Exception { - returnObject(connection); + connections.returnObject(connection); } @Override - protected Connection createObject() throws Exception { + public Connection makeObject() throws Exception { Connection connection = null; if (connectionFactory != null) { if (getUsername() != null && getPassword() != null) { @@ -108,7 +123,7 @@ public class ConnectionFactoryResource extends ObjectPool<Connection> implements } @Override - protected void destroyObject(Connection connection) throws Exception { + public void destroyObject(Connection connection) throws Exception { if (connection != null) { connection.stop(); connection.close(); @@ -147,4 +162,18 @@ public class ConnectionFactoryResource extends ObjectPool<Connection> implements public void setClientId(String clientId) { this.clientId = clientId; } + + public int size(){ + return connections.getNumActive() + connections.getNumIdle(); + } + + public void fillPool() throws Exception { + while(connections.getNumIdle() < connections.getMaxIdle()){ + connections.addObject(); + } + } + + public void drainPool() throws Exception { + connections.close(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java index 26ad7cc..4a5dcc2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java @@ -44,16 +44,6 @@ public interface ConnectionResource { Connection borrowConnection() throws Exception; /** - * Borrows a {@link Connection} from the connection pool. - * - * @param timeout the amount of time to wait before throwing an - * {@link Exception} - * @return {@link Connection} - * @throws Exception when no resource is available - */ - Connection borrowConnection(long timeout) throws Exception; - - /** * Returns the {@link Connection} to the connection pool. * * @param connection the borrowed {@link Connection} http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ObjectPool.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ObjectPool.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ObjectPool.java deleted file mode 100644 index eb5e883..0000000 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ObjectPool.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * TODO Add Class documentation for ObjectPool - * - */ -public abstract class ObjectPool<T> { - - private static final int DEFAULT_POOL_SIZE = 1; - protected final Logger logger = LoggerFactory.getLogger(getClass()); - private BlockingQueue<T> objects; - private int maxSize = DEFAULT_POOL_SIZE; - private AtomicInteger poolCount = new AtomicInteger(); - private ReadWriteLock lock = new ReentrantReadWriteLock(); - - public ObjectPool() { - this(DEFAULT_POOL_SIZE); - } - - public ObjectPool(int poolSize) { - this.maxSize = poolSize; - } - - public void fillPool() { - objects = new ArrayBlockingQueue<T>(getMaxSize(), false); - for (int i = 0; i < maxSize; i++) { - try { - T t = createObject(); - objects.add(t); - poolCount.incrementAndGet(); - } catch (Exception e) { - logger.error("Unable to create Object and add it to the pool. Reason: " - + e.getLocalizedMessage(), e); - } - } - } - - public void drainPool() throws Exception { - getLock().writeLock().lock(); - try { - while (!objects.isEmpty()) { - T t = objects.remove(); - destroyObject(t); - } - } finally { - getLock().writeLock().unlock(); - } - } - - /** - * Implement to create new objects of type T when the pool is initialized - * empty. - * - * @return - * @throws Exception - */ - protected abstract T createObject() throws Exception; - - /** - * Clean up pool objects - * - * @return - * @throws Exception - */ - protected abstract void destroyObject(T t) throws Exception; - - /** - * @return - * @throws Exception - */ - public T borrowObject() throws Exception { - return borrowObject(1000); - } - - /** - * @return - * @throws Exception - */ - public T borrowObject(long timeout) throws Exception { - T t = null; - getLock().writeLock().lock(); - try { - t = objects.poll(timeout, TimeUnit.MILLISECONDS); - } finally { - getLock().writeLock().unlock(); - } - return t; - } - - /** - * @param object - * @throws Exception - */ - public void returnObject(T object) throws Exception { - objects.add(object); - } - - /** - * @return - */ - int size() { - return objects.size(); - } - - /** - * Gets the ReadWriteLock value of lock for this instance of ObjectPool. - * - * @return the lock - */ - protected ReadWriteLock getLock() { - return lock; - } - - /** - * Gets the int value of maxSize for this instance of ObjectPool. - * - * @return the maxSize - */ - public int getMaxSize() { - return maxSize; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionPool.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionPool.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionPool.java deleted file mode 100644 index 8e9337a..0000000 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionPool.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.XASession; -import javax.transaction.xa.XAResource; - - -/** - * TODO Add Class documentation for SessionPool - * - */ -public class SessionPool extends ObjectPool<Session> { - - private ConnectionResource connectionResource; - private boolean transacted; - private SessionAcknowledgementType acknowledgeMode = SessionAcknowledgementType.AUTO_ACKNOWLEDGE; - - /** - * TODO Add Constructor Javadoc - * - */ - public SessionPool(int poolSize, ConnectionResource connectionResource) { - super(poolSize); - this.connectionResource = connectionResource; - } - - /** - * TODO Add Constructor Javadoc - * - * @param poolSize - */ - public SessionPool(int poolSize) { - super(poolSize); - } - - @Override - protected Session createObject() throws Exception { - Session session = null; - final Connection connection = getConnectionResource().borrowConnection(5000); - if (connection != null) { - if (transacted) { - session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - } else { - switch (acknowledgeMode) { - case CLIENT_ACKNOWLEDGE: - session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE); - break; - case DUPS_OK_ACKNOWLEDGE: - session = connection.createSession(transacted, Session.DUPS_OK_ACKNOWLEDGE); - break; - case AUTO_ACKNOWLEDGE: - session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - break; - default: - // do nothing here. - } - } - } - getConnectionResource().returnConnection(connection); - return session; - } - - @Override - protected void destroyObject(Session session) throws Exception { - // lets reset the session - session.setMessageListener(null); - - if (transacted) { - try { - session.rollback(); - } catch (JMSException e) { - logger.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); - } - } - if (session != null) { - session.close(); - session = null; - } - } - - /** - * Gets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool. - * - * @return the DEFAULT_ACKNOWLEDGE_MODE - */ - public final SessionAcknowledgementType getAcknowledgeMode() { - return acknowledgeMode; - } - - /** - * Sets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool. - * - * @param acknowledgeMode Sets SessionAcknowledgementType, default is AUTO_ACKNOWLEDGE - */ - public final void setAcknowledgeMode(SessionAcknowledgementType acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - /** - * Gets the boolean value of transacted for this instance of SessionPool. - * - * @return the transacted - */ - public final boolean isTransacted() { - return transacted; - } - - /** - * Sets the boolean value of transacted for this instance of SessionPool. - * - * @param transacted Sets boolean, default is TODO add default - */ - public final void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - /** - * Gets the ConnectionFactoryResource value of connectionResource for this instance of SessionPool. - * - * @return the connectionResource - */ - public ConnectionResource getConnectionResource() { - return connectionResource; - } - - protected XAResource createXaResource(XASession session) throws JMSException { - return session.getXAResource(); - } - - -// protected class Synchronization implements javax.transaction.Synchronization { -// private final XASession session; -// -// private Synchronization(XASession session) { -// this.session = session; -// } -// -// public void beforeCompletion() { -// } -// -// public void afterCompletion(int status) { -// try { -// // This will return session to the pool. -// session.setIgnoreClose(false); -// session.close(); -// session.setIsXa(false); -// } catch (JMSException e) { -// throw new RuntimeException(e); -// } -// } -// } -} http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index c83546f..d7eb449 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -29,6 +29,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.BatchMessage; +import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsProducer; import org.apache.camel.component.sjms.TransactionCommitStrategy; import org.apache.camel.component.sjms.jms.JmsMessageHelper; http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 605b15c..88a289a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -37,13 +37,16 @@ import javax.jms.Session; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelException; import org.apache.camel.Exchange; +import org.apache.camel.component.sjms.MessageConsumerResources; +import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; import org.apache.camel.component.sjms.SjmsProducer; import org.apache.camel.component.sjms.jms.JmsObjectFactory; -import org.apache.camel.component.sjms.jms.ObjectPool; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.util.ObjectHelper; +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,20 +69,11 @@ public class InOutProducer extends SjmsProducer { * TODO Add Class documentation for MessageProducerPool * TODO Externalize */ - protected class MessageConsumerPool extends ObjectPool<MessageConsumerResource> { - - /** - * TODO Add Constructor Javadoc - * - * @param poolSize - */ - public MessageConsumerPool(int poolSize) { - super(poolSize); - } + protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> { @Override - protected MessageConsumerResource createObject() throws Exception { - MessageConsumerResource answer = null; + public MessageConsumerResources makeObject() throws Exception { + MessageConsumerResources answer = null; Connection conn = null; Session session = null; try { @@ -101,9 +95,9 @@ public class InOutProducer extends SjmsProducer { @Override public void onMessage(Message message) { - if (logger.isDebugEnabled()) { - logger.debug("Message Received in the Consumer Pool"); - logger.debug(" Message : {}", message); + if (log.isDebugEnabled()) { + log.debug("Message Received in the Consumer Pool"); + log.debug(" Message : {}", message); } try { Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID()); @@ -114,7 +108,7 @@ public class InOutProducer extends SjmsProducer { } }); - answer = new MessageConsumerResource(session, messageConsumer, replyToDestination); + answer = new MessageConsumerResources(session, messageConsumer, replyToDestination); } catch (Exception e) { log.error("Unable to create the MessageConsumerResource: " + e.getLocalizedMessage()); throw new CamelException(e); @@ -125,7 +119,7 @@ public class InOutProducer extends SjmsProducer { } @Override - protected void destroyObject(MessageConsumerResource model) throws Exception { + public void destroyObject(MessageConsumerResources model) throws Exception { if (model.getMessageConsumer() != null) { model.getMessageConsumer().close(); } @@ -143,39 +137,6 @@ public class InOutProducer extends SjmsProducer { } } - /** - * TODO Add Class documentation for MessageConsumerResource - */ - protected class MessageConsumerResource { - private final Session session; - private final MessageConsumer messageConsumer; - private final Destination replyToDestination; - - /** - * TODO Add Constructor Javadoc - * - * @param session - * @param messageConsumer - */ - public MessageConsumerResource(Session session, MessageConsumer messageConsumer, Destination replyToDestination) { - this.session = session; - this.messageConsumer = messageConsumer; - this.replyToDestination = replyToDestination; - } - - public Session getSession() { - return session; - } - - public MessageConsumer getMessageConsumer() { - return messageConsumer; - } - - public Destination getReplyToDestination() { - return replyToDestination; - } - } - protected class InternalTempDestinationListener implements MessageListener { private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class); private Exchanger<Object> exchanger; @@ -204,7 +165,7 @@ public class InOutProducer extends SjmsProducer { } } - private MessageConsumerPool consumers; + private GenericObjectPool<MessageConsumerResources> consumers; public InOutProducer(SjmsEndpoint endpoint) { super(endpoint); @@ -219,8 +180,12 @@ public class InOutProducer extends SjmsProducer { log.debug("Using {} as the reply to destination.", getNamedReplyTo()); } if (getConsumers() == null) { - setConsumers(new MessageConsumerPool(getConsumerCount())); - getConsumers().fillPool(); + setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory())); + getConsumers().setMaxActive(getConsumerCount()); + getConsumers().setMaxIdle(getConsumerCount()); + while(getConsumers().getNumIdle() < getConsumers().getMaxIdle()){ + getConsumers().addObject(); + } } super.doStart(); } @@ -229,7 +194,7 @@ public class InOutProducer extends SjmsProducer { protected void doStop() throws Exception { super.doStop(); if (getConsumers() != null) { - getConsumers().drainPool(); + getConsumers().close(); setConsumers(null); } } @@ -307,7 +272,7 @@ public class InOutProducer extends SjmsProducer { lock.writeLock().unlock(); } - MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut()); + MessageConsumerResources consumer = consumers.borrowObject(); SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination()); consumers.returnObject(consumer); producer.getMessageProducer().send(request); @@ -352,11 +317,11 @@ public class InOutProducer extends SjmsProducer { callback.done(isSynchronous()); } - public void setConsumers(MessageConsumerPool consumers) { + public void setConsumers(GenericObjectPool<MessageConsumerResources> consumers) { this.consumers = consumers; } - public MessageConsumerPool getConsumers() { + public GenericObjectPool<MessageConsumerResources> getConsumers() { return consumers; } } http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java index 7cea40f..734343a 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java @@ -41,7 +41,7 @@ public class ConnectionResourceIT extends JmsTestSupport { /** * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)} + * {@link org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object)} * . * * @throws Exception @@ -129,28 +129,6 @@ public class ConnectionResourceIT extends JmsTestSupport { } @Override - public Connection borrowConnection(long timeout) throws Exception { - Connection answer = null; - int counter = 0; - while (counter++ < timeout) { - answer = pcf.createConnection(); - if (answer == null) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - log.info("Intrupted but ignoring: ", e.getMessage()); - } - } else { - break; - } - } - if (answer != null) { - answer.start(); - } - return answer; - } - - @Override public void returnConnection(Connection connection) throws Exception { // Do nothing in this case since the PooledConnectionFactory takes // care of this for us http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResourceTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResourceTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResourceTest.java index fa295bb..de8049c 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResourceTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResourceTest.java @@ -16,16 +16,17 @@ */ package org.apache.camel.component.sjms.jms; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.NoSuchElementException; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - /** * TODO Add Class documentation for ConnectionFactoryResourceTest */ @@ -44,7 +45,7 @@ public class ConnectionFactoryResourceTest { /** * Test method for - * {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResourceTest#createObject()} + * {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource#makeObject()} * . * * @throws Exception @@ -54,7 +55,7 @@ public class ConnectionFactoryResourceTest { ConnectionFactoryResource pool = new ConnectionFactoryResource(1, connectionFactory); pool.fillPool(); assertNotNull(pool); - ActiveMQConnection connection = (ActiveMQConnection)pool.borrowObject(); + ActiveMQConnection connection = (ActiveMQConnection)pool.makeObject(); assertNotNull(connection); assertTrue(connection.isStarted()); pool.drainPool(); @@ -62,7 +63,7 @@ public class ConnectionFactoryResourceTest { /** * Test method for - * {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResourceTest#createObject()} + * {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource#destroyObject()} * . * * @throws Exception @@ -72,7 +73,7 @@ public class ConnectionFactoryResourceTest { ConnectionFactoryResource pool = new ConnectionFactoryResource(1, connectionFactory); pool.fillPool(); assertNotNull(pool); - ActiveMQConnection connection = (ActiveMQConnection)pool.borrowObject(); + ActiveMQConnection connection = (ActiveMQConnection)pool.makeObject(); assertNotNull(connection); assertTrue(connection.isStarted()); pool.drainPool(); @@ -81,27 +82,24 @@ public class ConnectionFactoryResourceTest { /** * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#borrowObject()}. + * {@link org.apache.camel.component.sjms.jms.ConnectionResource#borrowConnection()}. * * @throws Exception */ - @Test + @Test(expected=NoSuchElementException.class) public void testBorrowObject() throws Exception { ConnectionFactoryResource pool = new ConnectionFactoryResource(1, connectionFactory); pool.fillPool(); assertNotNull(pool); - ActiveMQConnection connection = (ActiveMQConnection)pool.borrowObject(); + ActiveMQConnection connection = (ActiveMQConnection)pool.borrowConnection(); assertNotNull(connection); assertTrue(connection.isStarted()); - - ActiveMQConnection connection2 = (ActiveMQConnection)pool.borrowObject(); - assertNull(connection2); - pool.drainPool(); + pool.borrowConnection(); } /** * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)} + * {@link org.apache.camel.component.sjms.jms.ConnectionResource#returnConnection(java.lang.Object)} * . * * @throws Exception @@ -111,11 +109,11 @@ public class ConnectionFactoryResourceTest { ConnectionFactoryResource pool = new ConnectionFactoryResource(1, connectionFactory); pool.fillPool(); assertNotNull(pool); - ActiveMQConnection connection = (ActiveMQConnection)pool.borrowObject(); + ActiveMQConnection connection = (ActiveMQConnection)pool.borrowConnection(); assertNotNull(connection); assertTrue(connection.isStarted()); - pool.returnObject(connection); - ActiveMQConnection connection2 = (ActiveMQConnection)pool.borrowObject(); + pool.returnConnection(connection); + ActiveMQConnection connection2 = (ActiveMQConnection)pool.borrowConnection(); assertNotNull(connection2); pool.drainPool(); } http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java deleted file mode 100644 index 6e87c5e..0000000 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.camel.test.junit4.TestSupport; - -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * TODO Add Class documentation for ObjectPoolTest - */ -public class ObjectPoolTest extends TestSupport { - - private static final Logger LOGGER = LoggerFactory.getLogger(ObjectPoolTest.class); - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#ObjectPool()}. - * - * @throws Exception - */ - @Test - public void testObjectPool() throws Exception { - ObjectPool<MyPooledObject> testPool = new TestPool(); - assertNotNull(testPool); - testPool.fillPool(); - MyPooledObject pooledObject = testPool.borrowObject(); - assertNotNull(pooledObject); - assertTrue("Expected a value of 1. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1); - - MyPooledObject nextPooledObject = testPool.borrowObject(); - assertNull(nextPooledObject); - - testPool.returnObject(pooledObject); - nextPooledObject = testPool.borrowObject(); - assertNotNull(nextPooledObject); - testPool.drainPool(); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#ObjectPool()}. - */ - @Test - public void testBadObjectPool() { - ObjectPool<Object> objectPool = new BadTestPool(); - - try { - objectPool.createObject(); - fail("Should have thrown exception"); - } catch (Exception e) { - assertIsInstanceOf(IllegalStateException.class, e); - } - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#ObjectPool(int)}. - * - * @throws Exception - */ - @Test - public void testObjectPoolInt() throws Exception { - final int maxPoolObjects = 5; - - ObjectPool<MyPooledObject> testPool = new TestPool(maxPoolObjects); - testPool.fillPool(); - - List<MyPooledObject> poolObjects = new ArrayList<MyPooledObject>(); - for (int i = 0; i < maxPoolObjects; i++) { - poolObjects.add(testPool.borrowObject()); - } - for (int i = 0; i < maxPoolObjects; i++) { - MyPooledObject pooledObject = poolObjects.get(i); - assertNotNull("MyPooledObject was null for borrow attempt: " + i, pooledObject); - assertTrue("Expected a value in the range of 1-5. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6); - LOGGER.info("MyPooledObject has an ID of: " + pooledObject.getObjectId()); - } - - assertNull("Pool should be empty", testPool.borrowObject()); - - for (MyPooledObject myPooledObject : poolObjects) { - testPool.returnObject(myPooledObject); - } - - MyPooledObject pooledObject = testPool.borrowObject(); - assertNotNull(pooledObject); - assertTrue("Expected a value in the range of 1-5. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6); - - testPool.drainPool(); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#createObject()}. - * - * @throws Exception - */ - @Test - public void testCreateObject() throws Exception { - ObjectPool<MyPooledObject> testPool = new TestPool(); - assertNotNull(testPool.createObject()); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#borrowObject()}. - * - * @throws Exception - */ - @Test - public void testBorrowObject() throws Exception { - ObjectPool<MyPooledObject> testPool = new TestPool(); - testPool.fillPool(); - MyPooledObject pooledObject = testPool.borrowObject(); - assertNotNull(pooledObject); - assertTrue("Expected a value of 1. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1); - - MyPooledObject nextPooledObject = testPool.borrowObject(); - assertNull("Expected a null as the pool of 1 was already removed", nextPooledObject); - testPool.drainPool(); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)} - * . - * - * @throws Exception - */ - @Test - public void testReturnObject() throws Exception { - ObjectPool<MyPooledObject> testPool = new TestPool(); - testPool.fillPool(); - assertNotNull(testPool); - MyPooledObject pooledObject = testPool.borrowObject(); - MyPooledObject nextPooledObject = testPool.borrowObject(); - testPool.returnObject(pooledObject); - nextPooledObject = testPool.borrowObject(); - assertNotNull(nextPooledObject); - testPool.drainPool(); - } - - private static class TestPool extends ObjectPool<MyPooledObject> { - - private final AtomicInteger atomicInteger = new AtomicInteger(); - - public TestPool() { - } - - public TestPool(int poolSize) { - super(poolSize); - } - - @Override - protected MyPooledObject createObject() throws Exception { - return new MyPooledObject(atomicInteger.incrementAndGet()); - } - - @Override - protected void destroyObject(MyPooledObject t) throws Exception { - t = null; - } - - } - - static class MyPooledObject { - private int objectId = -1; - - public MyPooledObject(int objectId) { - this.objectId = objectId; - } - - /** - * @return the OBJECT_ID - */ - public Integer getObjectId() { - return this.objectId; - } - } - - private static class BadTestPool extends ObjectPool<Object> { - - @Override - protected Object createObject() throws Exception { - throw new IllegalStateException("I'm a bad ObjectPool impl"); - } - - @Override - protected void destroyObject(Object t) throws Exception { - // noop - } - - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/88809ef3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/SessionPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/SessionPoolTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/SessionPoolTest.java deleted file mode 100644 index dead399..0000000 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/SessionPoolTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * TODO Add Class documentation for ConnectionFactoryResourceTest - */ -public class SessionPoolTest { - private ActiveMQConnectionFactory connectionFactory; - - @Before - public void setup() { - connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false"); - } - - @After - public void teardown() { - connectionFactory = null; - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.SessionPoolTest#createObject()} - * - * @throws Exception - */ - @Test - public void testCreateObject() throws Exception { - ConnectionFactoryResource connections = new ConnectionFactoryResource(1, connectionFactory); - connections.fillPool(); - SessionPool sessions = new SessionPool(1, connections); - sessions.fillPool(); - assertNotNull(sessions); - Session session = sessions.createObject(); - assertNotNull(session); - sessions.drainPool(); - connections.drainPool(); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#borrowObject()}. - * - * @throws Exception - */ - @Test - public void testBorrowObject() throws Exception { - ConnectionFactoryResource connections = new ConnectionFactoryResource(1, connectionFactory); - connections.fillPool(); - SessionPool sessions = new SessionPool(1, connections); - sessions.fillPool(); - assertNotNull(sessions); - ActiveMQSession session = (ActiveMQSession)sessions.borrowObject(); - assertNotNull(session); - assertTrue(!session.isClosed()); - - ActiveMQSession session2 = (ActiveMQSession)sessions.borrowObject(); - assertNull(session2); - sessions.drainPool(); - connections.drainPool(); - } - - /** - * Test method for - * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)} - * . - * - * @throws Exception - */ - @Test - public void testReturnObject() throws Exception { - ConnectionFactoryResource connections = new ConnectionFactoryResource(1, connectionFactory); - connections.fillPool(); - SessionPool sessions = new SessionPool(1, connections); - sessions.fillPool(); - assertNotNull(sessions); - ActiveMQSession session = (ActiveMQSession)sessions.borrowObject(); - assertNotNull(session); - assertTrue(!session.isClosed()); - - ActiveMQSession session2 = (ActiveMQSession)sessions.borrowObject(); - assertNull(session2); - - sessions.returnObject(session); - session2 = (ActiveMQSession)sessions.borrowObject(); - assertNotNull(session2); - - sessions.drainPool(); - connections.drainPool(); - } - -}