Author: tabish
Date: Mon Apr 8 18:58:11 2013
New Revision: 1465723
URL: http://svn.apache.org/r1465723
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4441
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
(with props)
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Mon Apr 8 18:58:11 2013
@@ -32,6 +32,8 @@ import org.apache.activemq.util.JMSExcep
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Holds a real JMS connection along with the session pools associated with it.
@@ -43,6 +45,8 @@ import org.apache.commons.pool.impl.Gene
*/
public class ConnectionPool {
+ private static final transient Logger LOG =
LoggerFactory.getLogger(ConnectionPool.class);
+
private ActiveMQConnection connection;
private int referenceCount;
private long lastUsed = System.currentTimeMillis();
@@ -207,6 +211,9 @@ public class ConnectionPool {
* @return true if this connection has expired.
*/
public synchronized boolean expiredCheck() {
+
+ boolean expired = false;
+
if (connection == null) {
return true;
}
@@ -214,25 +221,27 @@ public class ConnectionPool {
if (hasExpired || hasFailed) {
if (referenceCount == 0) {
close();
+ expired = true;
}
- return true;
}
if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed +
expiryTimeout) {
hasExpired = true;
if (referenceCount == 0) {
close();
+ expired = true;
}
- return true;
}
+ // Only set hasExpired here is no references, as a Connection with
references is by
+ // definition not idle at this time.
if (referenceCount == 0 && idleTimeout > 0 &&
System.currentTimeMillis() > lastUsed + idleTimeout) {
hasExpired = true;
close();
- return true;
+ expired = true;
}
- return false;
+ return expired;
}
public int getIdleTimeout() {
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Mon Apr 8 18:58:11 2013
@@ -74,7 +74,6 @@ public class PooledConnection implements
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
- this.pool.incrementReferenceCount();
}
/**
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Mon Apr 8 18:58:11 2013
@@ -220,7 +220,25 @@ public class PooledConnectionFactory imp
}
try {
- connection = connectionsPool.borrowObject(key);
+
+ // We can race against other threads returning the connection when
there is an
+ // expiration or idle timeout. We keep pulling out ConnectionPool
instances until
+ // we win and get a non-closed instance and then increment the
reference count
+ // under lock to prevent another thread from triggering an
expiration check and
+ // pulling the rug out from under us.
+ while (connection == null) {
+ connection = connectionsPool.borrowObject(key);
+ synchronized (connection) {
+ if (connection.getConnection() != null) {
+ connection.incrementReferenceCount();
+ break;
+ }
+
+ // Return the bad one to the pool and let if get destroyed
as normal.
+ connectionsPool.returnObject(key, connection);
+ connection = null;
+ }
+ }
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to
retrieve a connection from the pool", e);
}
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java?rev=1465723&view=auto
==============================================================================
---
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
(added)
+++
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
Mon Apr 8 18:58:11 2013
@@ -0,0 +1,84 @@
+package org.apache.activemq.pool.bugs;
+
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.pool.PooledConnection;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4441Test {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AMQ4441Test.class);
+ private BrokerService broker;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test(timeout=120000)
+ public void demo() throws JMSException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final PooledConnectionFactory pooled = new
PooledConnectionFactory("vm://localhost?create=false");
+
+ pooled.setMaxConnections(2);
+ pooled.setExpiryTimeout(10L);
+ pooled.start();
+ Thread[] threads = new Thread[10];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!done.get() && latch.getCount() > 0) {
+ try {
+ final PooledConnection pooledConnection =
(PooledConnection) pooled.createConnection();
+ if (pooledConnection.getConnection() == null) {
+ LOG.info("Found broken connection.");
+ latch.countDown();
+ }
+ pooledConnection.close();
+ } catch (JMSException e) {
+ LOG.warn("Caught Exception", e);
+ }
+ }
+ }
+ });
+ }
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ if (latch.await(1, TimeUnit.MINUTES)) {
+ fail("A thread obtained broken connection");
+ }
+
+ done.set(true);
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+}
Propchange:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
------------------------------------------------------------------------------
svn:eol-style = native