Author: fhanik Date: Thu Jul 24 13:42:38 2014 New Revision: 1613123 URL: http://svn.apache.org/r1613123 Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=53367 Properly handle interrupts in the FairBlockingQueue
https://issues.apache.org/bugzilla/show_bug.cgi?id=53367 Added: tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java (with props) Modified: tomcat/trunk/modules/jdbc-pool/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java Modified: tomcat/trunk/modules/jdbc-pool/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java?rev=1613123&r1=1613122&r2=1613123&view=diff ============================================================================== --- tomcat/trunk/modules/jdbc-pool/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java (original) +++ tomcat/trunk/modules/jdbc-pool/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java Thu Jul 24 13:42:38 2014 @@ -133,9 +133,9 @@ public class FairBlockingQueue<E> implem public E poll(long timeout, TimeUnit unit) throws InterruptedException { E result = null; final ReentrantLock lock = this.lock; + //acquire the global lock until we know what to do + lock.lock(); try { - //acquire the global lock until we know what to do - lock.lock(); //check to see if we have objects result = items.poll(); if (result==null && timeout>0) { @@ -145,15 +145,36 @@ public class FairBlockingQueue<E> implem waiters.addLast(c); //unlock the global lock lock.unlock(); - //wait for the specified timeout - if (!c.await(timeout, unit)) { - //if we timed out, remove ourselves from the waitlist + boolean didtimeout = false; + InterruptedException interruptedException = null; + try { + //wait for the specified timeout + didtimeout = !c.await(timeout, unit); + } catch (InterruptedException ix) { + interruptedException = ix; + } + if (didtimeout) { + //if we timed out, or got interrupted + // remove ourselves from the waitlist lock.lock(); - waiters.remove(c); - lock.unlock(); + try { + waiters.remove(c); + } finally { + lock.unlock(); + } } //return the item we received, can be null if we timed out result = c.getItem(); + if (null!=interruptedException) { + //we got interrupted + if ( null!=result) { + //we got a result - clear the interrupt status + //don't propagate cause we have removed a connection from pool + Thread.interrupted(); + } else { + throw interruptedException; + } + } } else { //we have an object, release lock.unlock(); @@ -173,20 +194,20 @@ public class FairBlockingQueue<E> implem public Future<E> pollAsync() { Future<E> result = null; final ReentrantLock lock = this.lock; + //grab the global lock + lock.lock(); try { - //grab the global lock - lock.lock(); //check to see if we have objects in the queue E item = items.poll(); if (item==null) { //queue is empty, add ourselves as waiters - ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1); + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1); waiters.addLast(c); //return a future that will wait for the object - result = new ItemFuture<>(c); + result = new ItemFuture<E>(c); } else { //return a future with the item - result = new ItemFuture<>(item); + result = new ItemFuture<E>(item); } } finally { lock.unlock(); Added: tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java?rev=1613123&view=auto ============================================================================== --- tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java (added) +++ tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java Thu Jul 24 13:42:38 2014 @@ -0,0 +1,163 @@ +package org.apache.tomcat.jdbc.bugs; + + + +import org.apache.tomcat.jdbc.test.DefaultProperties; +import org.junit.Assert; +import org.apache.tomcat.jdbc.pool.ConnectionPool; +import org.apache.tomcat.jdbc.pool.DataSource; +import org.apache.tomcat.jdbc.pool.PoolExhaustedException; +import org.apache.tomcat.jdbc.pool.PoolProperties; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ArrayBlockingQueue; + +@RunWith(Parameterized.class) +public class Bug53367 { + + private boolean fairQueue; + + public Bug53367(boolean fair) { + this.fairQueue = fair; + } + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][]{ + new Object[] {Boolean.TRUE}, + new Object[] {Boolean.FALSE}, + }); + } + + @Test + public void testPool() throws SQLException, InterruptedException { + DriverManager.setLoginTimeout(1); + PoolProperties poolProperties = new DefaultProperties(); + int threadsCount = 3; + poolProperties.setMaxActive(threadsCount); + poolProperties.setMaxIdle(threadsCount); + poolProperties.setMinIdle(0); + poolProperties.setMaxWait(5000); + poolProperties.setInitialSize(0); + poolProperties.setRemoveAbandoned(true); + poolProperties.setRemoveAbandonedTimeout(300); + poolProperties.setRollbackOnReturn(true); + poolProperties.setFairQueue(fairQueue); + final DataSource ds = new DataSource(poolProperties); + + final CountDownLatch openedLatch = new CountDownLatch(threadsCount); + final CountDownLatch closedLatch = new CountDownLatch(threadsCount); + final CountDownLatch toCloseLatch = new CountDownLatch(1); + + for (int i = 0; i < threadsCount; i++) { + new Thread(new Runnable() { + @Override + public void run() { + try { + Connection connection = ds.getConnection(); + openedLatch.countDown(); + + toCloseLatch.await(); + connection.close(); + + closedLatch.countDown(); + + } catch (Exception e) { + System.err.println("Step 1:"+e.getMessage()); + } + } + }).start(); + } + + openedLatch.await(); + ConnectionPool pool = ds.getPool(); + //Now we have 3 initialized busy connections + Assert.assertEquals(0, pool.getIdle()); + Assert.assertEquals(threadsCount, pool.getActive()); + Assert.assertEquals(threadsCount, pool.getSize()); + + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < threadsCount; i++) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + ds.getConnection(); + } catch (Exception e) { + System.err.println("Step 2:"+e.getMessage()); + } + } + }); + thread.start(); + threads.add(thread); + } + for (Thread thread : threads) { + thread.interrupt(); + } + for (Thread thread : threads) { + thread.join(); + } + //Still 3 active connections + Assert.assertEquals(0, pool.getIdle()); + Assert.assertEquals(threadsCount, pool.getActive()); + Assert.assertEquals(threadsCount, pool.getSize()); + + toCloseLatch.countDown(); + closedLatch.await(); + + //Here comes the bug! No more active connections and unable to establish new connections. + + Assert.assertEquals(threadsCount, pool.getIdle()); // <-- Should be threadsCount (3) here + Assert.assertEquals(0, pool.getActive()); + Assert.assertEquals(threadsCount, pool.getSize()); + + final AtomicInteger failedCount = new AtomicInteger(); + final ArrayBlockingQueue<Connection> cons = new ArrayBlockingQueue<Connection>(threadsCount); + threads.clear(); + for (int i = 0; i < threadsCount; i++) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + cons.add(ds.getConnection()); + } catch (PoolExhaustedException e) { + failedCount.incrementAndGet(); + System.err.println("Step 3:"+e.getMessage()); + } catch (Exception e) { + System.err.println("Step 4:"+e.getMessage()); + throw new RuntimeException(e); + } + } + }); + thread.start(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + Assert.assertEquals(0, failedCount.get()); + + Assert.assertEquals(0, pool.getIdle()); + Assert.assertEquals(threadsCount, pool.getActive()); + Assert.assertEquals(threadsCount, pool.getSize()); + for (Connection con : cons) { + con.close(); + } + Assert.assertEquals(threadsCount, pool.getIdle()); + Assert.assertEquals(0, pool.getActive()); + Assert.assertEquals(threadsCount, pool.getSize()); + } +} \ No newline at end of file Propchange: tomcat/trunk/modules/jdbc-pool/src/test/java/org/apache/tomcat/jdbc/bugs/Bug53367.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org