https://issues.apache.org/jira/browse/AMQ-5226
Ensure that connections are returned when created and then handed out as FIFO after that. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/385ca818 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/385ca818 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/385ca818 Branch: refs/heads/activemq-5.10.x Commit: 385ca8184e5fd4fbefe7a2acfab35fee16e6bfd1 Parents: ac57ce9 Author: Timothy Bish <[email protected]> Authored: Fri Jun 13 10:30:39 2014 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Mon Dec 15 16:55:05 2014 -0500 ---------------------------------------------------------------------- .../jms/pool/PooledConnectionFactory.java | 61 ++++++++++---------- .../jms/pool/PooledConnectionFactoryTest.java | 2 - 2 files changed, 32 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/385ca818/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 86f5972..e60c52b 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -17,6 +17,7 @@ package org.apache.activemq.jms.pool; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -73,6 +74,9 @@ public class PooledConnectionFactory implements ConnectionFactory { private boolean createConnectionOnStartup = true; private boolean useAnonymousProducers = true; + // Temporary value used to always fetch the result of makeObject. + private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null); + public void initConnectionsPool() { if (this.connectionsPool == null) { this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>( @@ -112,6 +116,8 @@ public class PooledConnectionFactory implements ConnectionFactory { LOG.trace("Created new connection: {}", connection); } + PooledConnectionFactory.this.mostRecentlyCreated.set(connection); + return connection; } @@ -135,6 +141,7 @@ public class PooledConnectionFactory implements ConnectionFactory { // Set max idle (not max active) since our connections always idle in the pool. this.connectionsPool.setMaxIdle(1); + this.connectionsPool.setLifo(false); // We always want our validate method to control when idle objects are evicted. this.connectionsPool.setTestOnBorrow(true); @@ -195,45 +202,41 @@ public class PooledConnectionFactory implements ConnectionFactory { // will create a new one to meet the demand. if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) { try { - // we want borrowObject to return the one we added. - connectionsPool.setLifo(true); connectionsPool.addObject(key); + connection = mostRecentlyCreated.getAndSet(null); + connection.incrementReferenceCount(); } catch (Exception e) { throw createJmsException("Error while attempting to add new Connection to the pool", e); } } else { - // now we want the oldest one in the pool. - connectionsPool.setLifo(false); - } + try { + // We can race against other threads returning the connection when there is an + // expiration or idle timeout. We keep pulling out ConnectionPool instances until + // we win and get a non-closed instance and then increment the reference count + // under lock to prevent another thread from triggering an expiration check and + // pulling the rug out from under us. + while (connection == null) { + connection = connectionsPool.borrowObject(key); + synchronized (connection) { + if (connection.getConnection() != null) { + connection.incrementReferenceCount(); + break; + } - try { - - // We can race against other threads returning the connection when there is an - // expiration or idle timeout. We keep pulling out ConnectionPool instances until - // we win and get a non-closed instance and then increment the reference count - // under lock to prevent another thread from triggering an expiration check and - // pulling the rug out from under us. - while (connection == null) { - connection = connectionsPool.borrowObject(key); - synchronized (connection) { - if (connection.getConnection() != null) { - connection.incrementReferenceCount(); - break; + // Return the bad one to the pool and let if get destroyed as normal. + connectionsPool.returnObject(key, connection); + connection = null; } - - // Return the bad one to the pool and let if get destroyed as normal. - connectionsPool.returnObject(key, connection); - connection = null; } + } catch (Exception e) { + throw createJmsException("Error while attempting to retrieve a connection from the pool", e); } - } catch (Exception e) { - throw createJmsException("Error while attempting to retrieve a connection from the pool", e); - } - try { - connectionsPool.returnObject(key, connection); - } catch (Exception e) { - throw createJmsException("Error when returning connection to the pool", e); + try { + connectionsPool.returnObject(key, connection); + } catch (Exception e) { + throw createJmsException("Error when returning connection to the pool", e); + } } return newPooledConnection(connection); http://git-wip-us.apache.org/repos/asf/activemq/blob/385ca818/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java index 99b0ca8..e20a605 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java @@ -40,7 +40,6 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.util.Wait; import org.apache.log4j.Logger; -import org.junit.Ignore; import org.junit.Test; /** @@ -197,7 +196,6 @@ public class PooledConnectionFactoryTest { doTestConcurrentCreateGetsUniqueConnection(false); } - @Ignore("something up - don't know why the start call to createConnection does not cause close - but that does not fix it either!") @Test public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception { doTestConcurrentCreateGetsUniqueConnection(true);
