Author: fhanik Date: Tue Oct 28 12:58:43 2008 New Revision: 708652 URL: http://svn.apache.org/viewvc?rev=708652&view=rev Log: Added a different fairness option, since ArrayBlockingQueue with fair=true is really bad for performance
Added: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java Modified: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java Modified: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java?rev=708652&r1=708651&r2=708652&view=diff ============================================================================== --- tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java (original) +++ tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java Tue Oct 28 12:58:43 2008 @@ -64,6 +64,7 @@ /** * Contains all the connections that are in use + * TODO - this shouldn't be a blocking queue, simply a list to hold our objects */ protected BlockingQueue<PooledConnection> busy; @@ -264,8 +265,13 @@ poolProperties = properties; //make space for 10 extra in case we flow over a bit busy = new ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),false); + //busy = new FairBlockingQueue<PooledConnection>(); //make space for 10 extra in case we flow over a bit - idle = new ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),properties.isFairQueue()); + if (properties.isFairQueue()) { + idle = new FairBlockingQueue<PooledConnection>(); + } else { + idle = new ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),properties.isFairQueue()); + } //if the evictor thread is supposed to run, start it now if (properties.isPoolSweeperEnabled()) { Added: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java?rev=708652&view=auto ============================================================================== --- tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java (added) +++ tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java Tue Oct 28 12:58:43 2008 @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jdbc.pool; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * A simple implementation of a blocking queue with fairness waiting. + * invocations to method poll(...) will get handed out in the order they were received. + * @author Filip Hanik + * + */ + +public class FairBlockingQueue<E> implements BlockingQueue<E> { + ReentrantLock lock = new ReentrantLock(); + + LinkedList<E> items = null; + + LinkedList<ExchangeCountDownLatch<E>> waiters = null; + + public FairBlockingQueue() { + items = new LinkedList<E>(); + waiters = new LinkedList<ExchangeCountDownLatch<E>>(); + } + + //------------------------------------------------------------------ + // USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (waiters.size() > 0) { + ExchangeCountDownLatch<E> c = waiters.poll(); + c.setItem(e); + c.countDown(); + } else { + items.add(e); + } + } finally { + lock.unlock(); + } + return true; + } + + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E result = null; + final ReentrantLock lock = this.lock; + boolean error = true; + lock.lock(); + try { + result = items.poll(); + if (result==null) { + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1); + waiters.addLast(c); + lock.unlock(); + if (!c.await(timeout, unit)) { + lock.lock(); + waiters.remove(c); + lock.unlock(); + } + result = c.getItem(); + } else { + lock.unlock(); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + public boolean remove(Object e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.remove(e); + } finally { + lock.unlock(); + } + } + + public int size() { + return items.size(); + } + + public Iterator<E> iterator() { + return new FairIterator(); + } + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.poll(); + } finally { + lock.unlock(); + } + } + + public boolean contains(Object e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.contains(e); + } finally { + lock.unlock(); + } + } + + + //------------------------------------------------------------------ + // NOT USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + + @Override + public boolean add(E e) { + return offer(e); + } + + @Override + public int drainTo(Collection<? super E> c, int maxElements) { + throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)"); + } + + @Override + public int drainTo(Collection<? super E> c) { + return drainTo(c,Integer.MAX_VALUE); + } + + @Override + public void put(E e) throws InterruptedException { + offer(e); + } + + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE - size(); + } + + @Override + public E take() throws InterruptedException { + return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + @Override + public boolean addAll(Collection<? extends E> c) { + Iterator i = c.iterator(); + while (i.hasNext()) { + E e = (E)i.next(); + offer(e); + } + return true; + } + + @Override + public void clear() { + throw new UnsupportedOperationException("void clear()"); + + } + + @Override + public boolean containsAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)"); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)"); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)"); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException("Object[] toArray()"); + } + + @Override + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException("<T> T[] toArray(T[] a)"); + } + + @Override + public E element() { + throw new UnsupportedOperationException("E element()"); + } + + @Override + public E peek() { + throw new UnsupportedOperationException("E peek()"); + } + + @Override + public E remove() { + throw new UnsupportedOperationException("E remove()"); + } + + + + + + //------------------------------------------------------------------ + // Count down latch that can be used to exchange information + //------------------------------------------------------------------ + protected class ExchangeCountDownLatch<T> extends CountDownLatch { + protected T item; + public ExchangeCountDownLatch(int i) { + super(i); + } + public T getItem() { + return item; + } + public void setItem(T item) { + this.item = item; + } + } + + //------------------------------------------------------------------ + // Iterator safe from concurrent modification exceptions + //------------------------------------------------------------------ + protected class FairIterator implements Iterator<E> { + E[] elements = null; + int index; + E element = null; + + public FairIterator() { + final ReentrantLock lock = FairBlockingQueue.this.lock; + lock.lock(); + try { + elements = (E[]) new Object[FairBlockingQueue.this.items.size()]; + FairBlockingQueue.this.items.toArray(elements); + index = 0; + } finally { + lock.unlock(); + } + } + @Override + public boolean hasNext() { + return index<elements.length; + } + + @Override + public E next() { + element = elements[index++]; + return element; + } + + @Override + public void remove() { + final ReentrantLock lock = FairBlockingQueue.this.lock; + lock.lock(); + try { + if (element!=null) { + FairBlockingQueue.this.items.remove(element); + } + } finally { + lock.unlock(); + } + } + + } +} Modified: tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java?rev=708652&r1=708651&r2=708652&view=diff ============================================================================== --- tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java (original) +++ tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java Tue Oct 28 12:58:43 2008 @@ -117,6 +117,26 @@ tearDown(); } + public void testPoolThreads20Connections10Fair() throws Exception { + init(); + this.datasource.getPoolProperties().setMaxActive(10); + this.datasource.getPoolProperties().setFairQueue(true); + this.threadcount = 20; + this.transferProperties(); + this.datasource.getConnection().close(); + latch = new CountDownLatch(threadcount); + long start = System.currentTimeMillis(); + for (int i=0; i<threadcount; i++) { + TestThread t = new TestThread(); + t.setName("tomcat-pool-"+i); + t.d = DataSourceFactory.getDataSource(this.datasource); + t.start(); + } + latch.await(); + long delta = System.currentTimeMillis() - start; + System.out.println("[testPoolThreads20Connections10Fair]Test complete:"+delta+" ms. Iterations:"+(threadcount*this.iterations)); + tearDown(); + } public void testDBCPThreads10Connections10Validate() throws Exception { Modified: tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java?rev=708652&r1=708651&r2=708652&view=diff ============================================================================== --- tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java (original) +++ tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java Tue Oct 28 12:58:43 2008 @@ -58,6 +58,37 @@ } } + public void testCheckoutTimeoutFair() throws Exception { + try { + init(); + this.datasource.getPoolProperties().setFairQueue(true); + this.datasource.getPoolProperties().setTestWhileIdle(true); + this.datasource.getPoolProperties().setTestOnBorrow(false); + this.datasource.getPoolProperties().setTestOnReturn(false); + this.datasource.getPoolProperties().setValidationInterval(30000); + this.datasource.getPoolProperties().setTimeBetweenEvictionRunsMillis(1000); + this.datasource.getPoolProperties().setMaxActive(20); + this.datasource.getPoolProperties().setMaxWait(3000); + this.datasource.getPoolProperties().setRemoveAbandonedTimeout(5000); + this.datasource.getPoolProperties().setMinEvictableIdleTimeMillis(5000); + this.datasource.getPoolProperties().setMinIdle(5); + this.datasource.getPoolProperties().setLogAbandoned(true); + System.out.println("About to test connection pool:"+datasource); + for (int i = 0; i < 21; i++) { + long now = System.currentTimeMillis(); + this.datasource.getConnection(); + long delta = System.currentTimeMillis()-now; + System.out.println("Got connection #"+i+" in "+delta+" ms."); + } + } catch ( Exception x ) { + x.printStackTrace(); + }finally { + Thread.sleep(20000); + tearDown(); + } + } + + public void testRemoveAbandoned() throws Exception { try { init(); @@ -87,6 +118,37 @@ tearDown(); } } + + public void testRemoveAbandonedFair() throws Exception { + try { + init(); + this.datasource.getPoolProperties().setFairQueue(true); + this.datasource.getPoolProperties().setTestWhileIdle(true); + this.datasource.getPoolProperties().setTestOnBorrow(false); + this.datasource.getPoolProperties().setTestOnReturn(false); + this.datasource.getPoolProperties().setValidationInterval(30000); + this.datasource.getPoolProperties().setTimeBetweenEvictionRunsMillis(1000); + this.datasource.getPoolProperties().setMaxActive(20); + this.datasource.getPoolProperties().setMaxWait(3000); + this.datasource.getPoolProperties().setRemoveAbandonedTimeout(5000); + this.datasource.getPoolProperties().setMinEvictableIdleTimeMillis(5000); + this.datasource.getPoolProperties().setMinIdle(5); + this.datasource.getPoolProperties().setRemoveAbandoned(true); + this.datasource.getPoolProperties().setLogAbandoned(true); + System.out.println("About to test connection pool:"+datasource); + for (int i = 0; i < threadcount; i++) { + long now = System.currentTimeMillis(); + this.datasource.getConnection(); + long delta = System.currentTimeMillis()-now; + System.out.println("Got connection #"+i+" in "+delta+" ms."); + } + } catch ( Exception x ) { + x.printStackTrace(); + }finally { + Thread.sleep(20000); + tearDown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]