Author: fhanik
Date: Wed Aug  9 07:44:50 2006
New Revision: 430064

URL: http://svn.apache.org/viewvc?rev=430064&view=rev
Log:
Fixed deadlock issue with thread pool
Fixed error catches for a known JDK bug on windows #5076772 
Added in the ability to have more than one poller, although performance 
actually gets worse
Next steps: hand off setting socket options etc to the worker thread for faster 
acceptance of new socket

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
Wed Aug  9 07:44:50 2006
@@ -95,7 +95,6 @@
             //readTimeout = -1;
         }
         inputBuffer = new InternalNioInputBuffer(request, 
headerBufferSize,readTimeout);
-        inputBuffer.setPoller(endpoint.getPoller());
         request.setInputBuffer(inputBuffer);
 
         response = new Response();
@@ -752,7 +751,7 @@
             if (request.getAttribute("org.apache.tomcat.comet") == null) {
                 comet = false;
             }
-            SelectionKey key = 
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
             if ( key != null ) {
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) 
key.attachment();
                 if ( attach!=null ) {
@@ -778,10 +777,10 @@
             return SocketState.CLOSED;
         } else if (!comet) {
             recycle();
-            endpoint.getPoller().add(socket);
+            socket.getPoller().add(socket);
             return SocketState.OPEN;
         } else {
-            endpoint.getCometPoller().add(socket);
+            socket.getPoller().add(socket);
             return SocketState.LONG;
         }
     }
@@ -809,7 +808,6 @@
         this.socket = socket;
         inputBuffer.setSocket(socket);
         outputBuffer.setSocket(socket);
-        outputBuffer.setSelector(endpoint.getPoller().getSelector());
 
         // Error flag
         error = false;
@@ -841,7 +839,7 @@
                     // and the method should return true
                     openSocket = true;
                     // Add the socket to the poller
-                    endpoint.getPoller().add(socket);
+                    socket.getPoller().add(socket);
                     break;
                 }
                 request.setStartTime(System.currentTimeMillis());
@@ -897,7 +895,7 @@
                     if (request.getAttribute("org.apache.tomcat.comet") != 
null) {
                         comet = true;
                     }
-                    SelectionKey key = 
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+                    SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                     if (key != null) {
                         NioEndpoint.KeyAttachment attach = 
(NioEndpoint.KeyAttachment) key.attachment();
                         if (attach != null)  {
@@ -1049,7 +1047,7 @@
 
             comet = false;
             cometClose = true;
-            SelectionKey key = 
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
             if ( key != null ) {
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) 
key.attachment();
                 if ( attach!=null && attach.getComet()) {

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
Wed Aug  9 07:44:50 2006
@@ -223,6 +223,21 @@
 
     // -------------------- Pool setup --------------------
 
+    public void setPollerThreadCount(int count) {
+        ep.setPollerThreadCount(count);
+    }
+    
+    public int getPollerThreadCount() {
+        return ep.getPollerThreadCount();
+    }
+    
+    public void setSelectorTimeout(long timeout) {
+        ep.setSelectorTimeout(timeout);
+    }
+    
+    public long getSelectorTimeout() {
+        return ep.getSelectorTimeout();
+    }
     // *
     public Executor getExecutor() {
         return ep.getExecutor();
@@ -616,7 +631,7 @@
                     // processor.
                     connections.put(socket, processor);
                     localProcessor.set(null);
-                    proto.ep.getCometPoller().add(socket);
+                    socket.getPoller().add(socket);
                 }
                 return state;
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
Wed Aug  9 07:44:50 2006
@@ -182,7 +182,6 @@
      * header.
      */
     protected long readTimeout;
-    private Poller poller;
 
     // ------------------------------------------------------------- Properties
 
@@ -202,10 +201,6 @@
         return socket;
     }
 
-    public Poller getPoller() {
-        return poller;
-    }
-
     /**
      * Add an input filter to the filter library.
      */
@@ -274,10 +269,6 @@
         this.swallowInput = swallowInput;
     }
 
-    public void setPoller(Poller poller) {
-        this.poller = poller;
-    }
-
     // --------------------------------------------------------- Public Methods
 
 
@@ -564,7 +555,7 @@
                 timedOut = (readTimeout != -1) && 
((System.currentTimeMillis()-start)>readTimeout);
                 if ( !timedOut && nRead == 0 )  {
                     try {
-                        final SelectionKey key = 
socket.getIOChannel().keyFor(poller.getSelector());
+                        final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                         final KeyAttachment att = 
(KeyAttachment)key.attachment();
                         //to do, add in a check, we might have just timed out 
on the wait,
                         //so there is no need to register us again.
@@ -587,7 +578,7 @@
 
     private void addToReadQueue(final SelectionKey key, final KeyAttachment 
att) {
         att.setWakeUp(true);
-        poller.addEvent(
+        att.getPoller().addEvent(
             new Runnable() {
             public void run() {
                 try {

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
Wed Aug  9 07:44:50 2006
@@ -49,8 +49,7 @@
     // ----------------------------------------------------------- Constructors
     int bbufLimit = 0;
     
-    Selector selector;
-
+    
     /**
      * Default constructor.
      */
@@ -182,10 +181,6 @@
         this.socket = socket;
     }
 
-    public void setSelector(Selector selector) {
-        this.selector = selector;
-    }
-
     /**
      * Get the underlying socket input stream.
      */
@@ -715,7 +710,7 @@
         throws IOException {
 
         //prevent timeout for async,
-        SelectionKey key = socket.getIOChannel().keyFor(selector);
+        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if (key != null) {
             NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) 
key.attachment();
             attach.access();

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed 
Aug  9 07:44:50 2006
@@ -20,7 +20,9 @@
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
+
 /**
  * 
  * Base class for a SocketChannel wrapper used by the endpoint.
@@ -37,6 +39,8 @@
     protected SocketChannel sc = null;
 
     protected ApplicationBufferHandler bufHandler;
+    
+    protected Poller poller;
 
     public NioChannel(SocketChannel channel, ApplicationBufferHandler 
bufHandler) throws IOException {
         this.sc = channel;
@@ -112,6 +116,10 @@
         return bufHandler;
     }
 
+    public Poller getPoller() {
+        return poller;
+    }
+
     /**
      * getIOChannel
      *
@@ -146,5 +154,8 @@
         return 0;
     }
 
+    public void setPoller(Poller poller) {
+        this.poller = poller;
+    }
 
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed 
Aug  9 07:44:50 2006
@@ -42,6 +42,9 @@
 import org.apache.tomcat.util.res.StringManager;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * NIO tailored thread pool, providing the following services:
@@ -316,7 +319,7 @@
      */
     protected Poller[] pollers = null;
     protected int pollerRoundRobin = 0;
-    public Poller getPoller() {
+    public Poller getPoller0() {
         pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
         Poller poller = pollers[pollerRoundRobin];
         return poller;
@@ -326,8 +329,8 @@
     /**
      * The socket poller used for Comet support.
      */
-    public Poller getCometPoller() {
-        Poller poller = getPoller();
+    public Poller getCometPoller0() {
+        Poller poller = getPoller0();
         return poller;
     }
 
@@ -335,13 +338,13 @@
     /**
      * Dummy maxSpareThreads property.
      */
-    public int getMaxSpareThreads() { return 0; }
+    public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
 
 
     /**
      * Dummy minSpareThreads property.
      */
-    public int getMinSpareThreads() { return 0; }
+    public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
 
     // --------------------  SSL related properties --------------------
     protected String keystoreFile = 
System.getProperty("user.home")+"/.keystore";
@@ -470,8 +473,8 @@
             // FIXME: Doesn't seem to work that well with multiple accept 
threads
             acceptorThreadCount = 1;
         }
-        if (pollerThreadCount != 1) {
-            // limit to one poller, no need for others
+        if (pollerThreadCount <= 0) {
+            //minimum one poller thread
             pollerThreadCount = 1;
         }
 
@@ -513,10 +516,12 @@
         if (!running) {
             running = true;
             paused = false;
-
+            
+            
             // Create worker collection
             if (executor == null) {
                 workers = new WorkerStack(maxThreads);
+                //executor = new 
ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new
 LinkedBlockingQueue<Runnable>());
             }
 
             // Start acceptor threads
@@ -528,6 +533,7 @@
             }
 
             // Start poller threads
+            log.info("Creating poller threads:"+pollerThreadCount);
             pollers = new Poller[pollerThreadCount];
             for (int i = 0; i < pollerThreadCount; i++) {
                 pollers[i] = new Poller();
@@ -678,7 +684,8 @@
                 channel = new NioChannel(socket,bufhandler);
             }
             
-            getPoller().register(channel);
+            
+            getPoller0().register(channel);
 
         } catch (Throwable t) {
             if (log.isDebugEnabled()) {
@@ -746,12 +753,13 @@
         while (workerThread == null) {
             try {
                 synchronized (workers) {
-                    workers.wait();
+                    workerThread = createWorkerThread();
+                    if ( workerThread == null ) workers.wait();
                 }
             } catch (InterruptedException e) {
                 // Ignore
             }
-            workerThread = createWorkerThread();
+            if ( workerThread == null ) workerThread = createWorkerThread();
         }
         return workerThread;
     }
@@ -974,11 +982,13 @@
         
         public void register(final NioChannel socket)
         {
+            socket.setPoller(this);
+            final KeyAttachment ka = new KeyAttachment(this);
+            ka.setChannel(socket);
             Runnable r = new Runnable() {
                 public void run() {
                     try {
-                        KeyAttachment ka = new KeyAttachment();
-                        ka.setChannel(socket);
+                        
                         socket.getIOChannel().register(selector, 
SelectionKey.OP_READ, ka);
                     } catch (Exception x) {
                         log.error("", x);
@@ -1027,6 +1037,14 @@
                 try {
                     wakeupCounter.set(0);
                     keyCount = selector.select(selectorTimeout);
+                } catch ( NullPointerException x ) {
+                    //sun bug 5076772 on windows JDK 1.5
+                    if ( wakeupCounter == null || selector == null ) throw x;
+                    continue;
+                } catch ( CancelledKeyException x ) {
+                    //sun bug 5076772 on windows JDK 1.5
+                    if ( wakeupCounter == null || selector == null ) throw x;
+                    continue;
                 } catch (Throwable x) {
                     log.error("",x);
                     continue;
@@ -1045,11 +1063,9 @@
                     iterator.remove();
                     KeyAttachment attachment = (KeyAttachment)sk.attachment();
                     try {
-                        if ( sk.isValid() ) {
-                            if(attachment == null) attachment = new 
KeyAttachment();
+                        if ( sk.isValid() && attachment != null ) {
                             attachment.access();
                             sk.attach(attachment);
-                            int readyOps = sk.readyOps();
                             sk.interestOps(0);
                             attachment.interestOps(0);
                             NioChannel channel = attachment.getChannel();
@@ -1121,7 +1137,12 @@
     }
     
     public static class KeyAttachment {
-
+        
+        public KeyAttachment(Poller poller) {
+            this.poller = poller;
+        }
+        public Poller getPoller() { return poller;}
+        public void setPoller(Poller poller){this.poller = poller;}
         public long getLastAccess() { return lastAccess; }
         public void access() { access(System.currentTimeMillis()); }
         public void access(long access) { lastAccess = access; }
@@ -1138,6 +1159,7 @@
         public void setError(boolean error) { this.error = error; }
         public NioChannel getChannel() { return channel;}
         public void setChannel(NioChannel channel) { this.channel = channel;}
+        protected Poller poller = null;
         protected int interestOps = 0;
         public int interestOps() { return interestOps;}
         public int interestOps(int ops) { this.interestOps  = ops; return ops; 
}
@@ -1254,7 +1276,7 @@
                 NioChannel socket = await();
                 if (socket == null)
                     continue;
-                SelectionKey key = 
socket.getIOChannel().keyFor(getPoller().getSelector());
+                SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                 int handshake = -1;
                 try {
                     handshake = socket.handshake(key.isReadable(), 
key.isWritable());
@@ -1310,7 +1332,7 @@
 
                         }
                     };
-                    getPoller().addEvent(r);
+                    ka.getPoller().addEvent(r);
                 }
                 //dereference socket to let GC do its job
                 socket = null;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to