Author: fhanik
Date: Tue Oct 28 12:58:43 2008
New Revision: 708652

URL: http://svn.apache.org/viewvc?rev=708652&view=rev
Log:
Added a different fairness option, since ArrayBlockingQueue with fair=true is 
really bad for performance

Added:
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
Modified:
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java

Modified: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java?rev=708652&r1=708651&r2=708652&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
 Tue Oct 28 12:58:43 2008
@@ -64,6 +64,7 @@
 
     /**
      * Contains all the connections that are in use
+     * TODO - this shouldn't be a blocking queue, simply a list to hold our 
objects
      */
     protected BlockingQueue<PooledConnection> busy;
 
@@ -264,8 +265,13 @@
         poolProperties = properties;
         //make space for 10 extra in case we flow over a bit
         busy = new 
ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),false);
+        //busy = new FairBlockingQueue<PooledConnection>();
         //make space for 10 extra in case we flow over a bit
-        idle = new 
ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),properties.isFairQueue());
+        if (properties.isFairQueue()) {
+            idle = new FairBlockingQueue<PooledConnection>();
+        } else {
+            idle = new 
ArrayBlockingQueue<PooledConnection>(properties.getMaxActive(),properties.isFairQueue());
+        }
 
         //if the evictor thread is supposed to run, start it now
         if (properties.isPoolSweeperEnabled()) {

Added: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java?rev=708652&view=auto
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
 (added)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
 Tue Oct 28 12:58:43 2008
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.jdbc.pool;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * 
+ * A simple implementation of a blocking queue with fairness waiting.
+ * invocations to method poll(...) will get handed out in the order they were 
received.
+ * @author Filip Hanik
+ * 
+ */
+
+public class FairBlockingQueue<E> implements BlockingQueue<E> {
+    ReentrantLock lock = new ReentrantLock();
+    
+    LinkedList<E> items = null;
+    
+    LinkedList<ExchangeCountDownLatch<E>> waiters = null;
+    
+    public FairBlockingQueue() {
+        items = new LinkedList<E>();
+        waiters = new LinkedList<ExchangeCountDownLatch<E>>();
+    }
+    
+    //------------------------------------------------------------------    
+    // USED BY CONPOOL IMPLEMENTATION
+    //------------------------------------------------------------------    
+    public boolean offer(E e) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            if (waiters.size() > 0) {
+                ExchangeCountDownLatch<E> c = waiters.poll();
+                c.setItem(e);
+                c.countDown();
+            } else {
+                items.add(e);
+            }
+        } finally {
+            lock.unlock();
+        }
+        return true;
+    }
+
+    public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
+        return offer(e);
+    }
+    
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        E result = null;
+        final ReentrantLock lock = this.lock;
+        boolean error = true;
+        lock.lock();
+        try {
+            result = items.poll();
+            if (result==null) {
+                ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
+                waiters.addLast(c);
+                lock.unlock();
+                if (!c.await(timeout, unit)) {
+                    lock.lock();
+                    waiters.remove(c);
+                    lock.unlock();
+                }
+                result = c.getItem();
+            } else {
+                lock.unlock();
+            }
+            error = false;
+        } finally {
+            if (error && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
+        }
+        return result;
+    }
+    
+    public boolean remove(Object e) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return items.remove(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    public int size() {
+        return items.size();
+    }
+    
+    public Iterator<E> iterator() {
+        return new FairIterator();
+    }
+    
+    public E poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return items.poll();
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    public boolean contains(Object e) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return items.contains(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+
+    //------------------------------------------------------------------    
+    // NOT USED BY CONPOOL IMPLEMENTATION
+    //------------------------------------------------------------------    
+    
+    @Override
+    public boolean add(E e) {
+        return offer(e);
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new UnsupportedOperationException("int drainTo(Collection<? 
super E> c, int maxElements)");
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c) {
+        return drainTo(c,Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void put(E e) throws InterruptedException {
+        offer(e);
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE - size();
+    }
+
+    @Override
+    public E take() throws InterruptedException {
+        return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        Iterator i = c.iterator();
+        while (i.hasNext()) {
+            E e = (E)i.next();
+            offer(e);
+        }
+        return true;
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException("void clear()");
+        
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
containsAll(Collection<?> c)");
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
removeAll(Collection<?> c)");
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
retainAll(Collection<?> c)");
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException("Object[] toArray()");
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
+    }
+
+    @Override
+    public E element() {
+        throw new UnsupportedOperationException("E element()");
+    }
+
+    @Override
+    public E peek() {
+        throw new UnsupportedOperationException("E peek()");
+    }
+
+    @Override
+    public E remove() {
+        throw new UnsupportedOperationException("E remove()");
+    }
+
+
+
+
+
+    //------------------------------------------------------------------    
+    // Count down latch that can be used to exchange information
+    //------------------------------------------------------------------    
+    protected class ExchangeCountDownLatch<T> extends CountDownLatch {
+        protected T item;
+        public ExchangeCountDownLatch(int i) {
+            super(i);
+        }
+        public T getItem() {
+            return item;
+        }
+        public void setItem(T item) {
+            this.item = item;
+        }
+    }
+    
+    //------------------------------------------------------------------    
+    // Iterator safe from concurrent modification exceptions
+    //------------------------------------------------------------------    
+    protected class FairIterator implements Iterator<E> {
+        E[] elements = null;
+        int index;
+        E element = null;
+        
+        public FairIterator() {
+            final ReentrantLock lock = FairBlockingQueue.this.lock;
+            lock.lock();
+            try {
+                elements = (E[]) new 
Object[FairBlockingQueue.this.items.size()];
+                FairBlockingQueue.this.items.toArray(elements);
+                index = 0;
+            } finally {
+                lock.unlock();
+            }
+        }
+        @Override
+        public boolean hasNext() {
+            return index<elements.length;
+        }
+
+        @Override
+        public E next() {
+            element = elements[index++];
+            return element;
+        }
+
+        @Override
+        public void remove() {
+            final ReentrantLock lock = FairBlockingQueue.this.lock;
+            lock.lock();
+            try {
+                if (element!=null) {
+                    FairBlockingQueue.this.items.remove(element);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        
+    }
+}

Modified: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java?rev=708652&r1=708651&r2=708652&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/CheckOutThreadTest.java
 Tue Oct 28 12:58:43 2008
@@ -117,6 +117,26 @@
         tearDown();
     }
 
+    public void testPoolThreads20Connections10Fair() throws Exception {
+        init();
+        this.datasource.getPoolProperties().setMaxActive(10);
+        this.datasource.getPoolProperties().setFairQueue(true);
+        this.threadcount = 20;
+        this.transferProperties();
+        this.datasource.getConnection().close();
+        latch = new CountDownLatch(threadcount);
+        long start = System.currentTimeMillis();
+        for (int i=0; i<threadcount; i++) {
+            TestThread t = new TestThread();
+            t.setName("tomcat-pool-"+i);
+            t.d = DataSourceFactory.getDataSource(this.datasource);
+            t.start();
+        }
+        latch.await();
+        long delta = System.currentTimeMillis() - start;
+        System.out.println("[testPoolThreads20Connections10Fair]Test 
complete:"+delta+" ms. Iterations:"+(threadcount*this.iterations));
+        tearDown();
+    }
 
     
     public void testDBCPThreads10Connections10Validate() throws Exception {

Modified: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java?rev=708652&r1=708651&r2=708652&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestTimeout.java
 Tue Oct 28 12:58:43 2008
@@ -58,6 +58,37 @@
         }
     }
 
+    public void testCheckoutTimeoutFair() throws Exception {
+        try {
+            init();
+            this.datasource.getPoolProperties().setFairQueue(true);
+            this.datasource.getPoolProperties().setTestWhileIdle(true);
+            this.datasource.getPoolProperties().setTestOnBorrow(false);
+            this.datasource.getPoolProperties().setTestOnReturn(false);
+            this.datasource.getPoolProperties().setValidationInterval(30000);
+            
this.datasource.getPoolProperties().setTimeBetweenEvictionRunsMillis(1000);
+            this.datasource.getPoolProperties().setMaxActive(20);
+            this.datasource.getPoolProperties().setMaxWait(3000);
+            
this.datasource.getPoolProperties().setRemoveAbandonedTimeout(5000);
+            
this.datasource.getPoolProperties().setMinEvictableIdleTimeMillis(5000);
+            this.datasource.getPoolProperties().setMinIdle(5);
+            this.datasource.getPoolProperties().setLogAbandoned(true);
+            System.out.println("About to test connection pool:"+datasource);
+            for (int i = 0; i < 21; i++) {
+                long now = System.currentTimeMillis();
+                this.datasource.getConnection();
+                long delta = System.currentTimeMillis()-now;
+                System.out.println("Got connection #"+i+" in "+delta+" ms.");
+            }
+        } catch ( Exception x ) {
+            x.printStackTrace();
+        }finally {
+            Thread.sleep(20000);
+            tearDown();
+        }
+    }
+    
+
     public void testRemoveAbandoned() throws Exception {
         try {
             init();
@@ -87,6 +118,37 @@
             tearDown();
         }
     }
+    
+    public void testRemoveAbandonedFair() throws Exception {
+        try {
+            init();
+            this.datasource.getPoolProperties().setFairQueue(true);
+            this.datasource.getPoolProperties().setTestWhileIdle(true);
+            this.datasource.getPoolProperties().setTestOnBorrow(false);
+            this.datasource.getPoolProperties().setTestOnReturn(false);
+            this.datasource.getPoolProperties().setValidationInterval(30000);
+            
this.datasource.getPoolProperties().setTimeBetweenEvictionRunsMillis(1000);
+            this.datasource.getPoolProperties().setMaxActive(20);
+            this.datasource.getPoolProperties().setMaxWait(3000);
+            
this.datasource.getPoolProperties().setRemoveAbandonedTimeout(5000);
+            
this.datasource.getPoolProperties().setMinEvictableIdleTimeMillis(5000);
+            this.datasource.getPoolProperties().setMinIdle(5);
+            this.datasource.getPoolProperties().setRemoveAbandoned(true);
+            this.datasource.getPoolProperties().setLogAbandoned(true);
+            System.out.println("About to test connection pool:"+datasource);
+            for (int i = 0; i < threadcount; i++) {
+                long now = System.currentTimeMillis();
+                this.datasource.getConnection();
+                long delta = System.currentTimeMillis()-now;
+                System.out.println("Got connection #"+i+" in "+delta+" ms.");
+            }
+        } catch ( Exception x ) {
+            x.printStackTrace();
+        }finally {
+            Thread.sleep(20000);
+            tearDown();
+        }
+    }
 
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to