Author: fhanik Date: Wed Aug 9 07:44:50 2006 New Revision: 430064 URL: http://svn.apache.org/viewvc?rev=430064&view=rev Log: Fixed deadlock issue with thread pool Fixed error catches for a known JDK bug on windows #5076772 Added in the ability to have more than one poller, although performance actually gets worse Next steps: hand off setting socket options etc to the worker thread for faster acceptance of new socket
Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Wed Aug 9 07:44:50 2006 @@ -95,7 +95,6 @@ //readTimeout = -1; } inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout); - inputBuffer.setPoller(endpoint.getPoller()); request.setInputBuffer(inputBuffer); response = new Response(); @@ -752,7 +751,7 @@ if (request.getAttribute("org.apache.tomcat.comet") == null) { comet = false; } - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if ( attach!=null ) { @@ -778,10 +777,10 @@ return SocketState.CLOSED; } else if (!comet) { recycle(); - endpoint.getPoller().add(socket); + socket.getPoller().add(socket); return SocketState.OPEN; } else { - endpoint.getCometPoller().add(socket); + socket.getPoller().add(socket); return SocketState.LONG; } } @@ -809,7 +808,6 @@ this.socket = socket; inputBuffer.setSocket(socket); outputBuffer.setSocket(socket); - outputBuffer.setSelector(endpoint.getPoller().getSelector()); // Error flag error = false; @@ -841,7 +839,7 @@ // and the method should return true openSocket = true; // Add the socket to the poller - endpoint.getPoller().add(socket); + socket.getPoller().add(socket); break; } request.setStartTime(System.currentTimeMillis()); @@ -897,7 +895,7 @@ if (request.getAttribute("org.apache.tomcat.comet") != null) { comet = true; } - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if (key != null) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if (attach != null) { @@ -1049,7 +1047,7 @@ comet = false; cometClose = true; - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if ( attach!=null && attach.getComet()) { Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Wed Aug 9 07:44:50 2006 @@ -223,6 +223,21 @@ // -------------------- Pool setup -------------------- + public void setPollerThreadCount(int count) { + ep.setPollerThreadCount(count); + } + + public int getPollerThreadCount() { + return ep.getPollerThreadCount(); + } + + public void setSelectorTimeout(long timeout) { + ep.setSelectorTimeout(timeout); + } + + public long getSelectorTimeout() { + return ep.getSelectorTimeout(); + } // * public Executor getExecutor() { return ep.getExecutor(); @@ -616,7 +631,7 @@ // processor. connections.put(socket, processor); localProcessor.set(null); - proto.ep.getCometPoller().add(socket); + socket.getPoller().add(socket); } return state; Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Wed Aug 9 07:44:50 2006 @@ -182,7 +182,6 @@ * header. */ protected long readTimeout; - private Poller poller; // ------------------------------------------------------------- Properties @@ -202,10 +201,6 @@ return socket; } - public Poller getPoller() { - return poller; - } - /** * Add an input filter to the filter library. */ @@ -274,10 +269,6 @@ this.swallowInput = swallowInput; } - public void setPoller(Poller poller) { - this.poller = poller; - } - // --------------------------------------------------------- Public Methods @@ -564,7 +555,7 @@ timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout); if ( !timedOut && nRead == 0 ) { try { - final SelectionKey key = socket.getIOChannel().keyFor(poller.getSelector()); + final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); final KeyAttachment att = (KeyAttachment)key.attachment(); //to do, add in a check, we might have just timed out on the wait, //so there is no need to register us again. @@ -587,7 +578,7 @@ private void addToReadQueue(final SelectionKey key, final KeyAttachment att) { att.setWakeUp(true); - poller.addEvent( + att.getPoller().addEvent( new Runnable() { public void run() { try { Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Wed Aug 9 07:44:50 2006 @@ -49,8 +49,7 @@ // ----------------------------------------------------------- Constructors int bbufLimit = 0; - Selector selector; - + /** * Default constructor. */ @@ -182,10 +181,6 @@ this.socket = socket; } - public void setSelector(Selector selector) { - this.selector = selector; - } - /** * Get the underlying socket input stream. */ @@ -715,7 +710,7 @@ throws IOException { //prevent timeout for async, - SelectionKey key = socket.getIOChannel().keyFor(selector); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if (key != null) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); attach.access(); Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed Aug 9 07:44:50 2006 @@ -20,7 +20,9 @@ import java.nio.channels.ByteChannel; import java.nio.channels.SocketChannel; +import org.apache.tomcat.util.net.NioEndpoint.Poller; import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; + /** * * Base class for a SocketChannel wrapper used by the endpoint. @@ -37,6 +39,8 @@ protected SocketChannel sc = null; protected ApplicationBufferHandler bufHandler; + + protected Poller poller; public NioChannel(SocketChannel channel, ApplicationBufferHandler bufHandler) throws IOException { this.sc = channel; @@ -112,6 +116,10 @@ return bufHandler; } + public Poller getPoller() { + return poller; + } + /** * getIOChannel * @@ -146,5 +154,8 @@ return 0; } + public void setPoller(Poller poller) { + this.poller = poller; + } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430064&r1=430063&r2=430064&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Aug 9 07:44:50 2006 @@ -42,6 +42,9 @@ import org.apache.tomcat.util.res.StringManager; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * NIO tailored thread pool, providing the following services: @@ -316,7 +319,7 @@ */ protected Poller[] pollers = null; protected int pollerRoundRobin = 0; - public Poller getPoller() { + public Poller getPoller0() { pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; Poller poller = pollers[pollerRoundRobin]; return poller; @@ -326,8 +329,8 @@ /** * The socket poller used for Comet support. */ - public Poller getCometPoller() { - Poller poller = getPoller(); + public Poller getCometPoller0() { + Poller poller = getPoller0(); return poller; } @@ -335,13 +338,13 @@ /** * Dummy maxSpareThreads property. */ - public int getMaxSpareThreads() { return 0; } + public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); } /** * Dummy minSpareThreads property. */ - public int getMinSpareThreads() { return 0; } + public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); } // -------------------- SSL related properties -------------------- protected String keystoreFile = System.getProperty("user.home")+"/.keystore"; @@ -470,8 +473,8 @@ // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } - if (pollerThreadCount != 1) { - // limit to one poller, no need for others + if (pollerThreadCount <= 0) { + //minimum one poller thread pollerThreadCount = 1; } @@ -513,10 +516,12 @@ if (!running) { running = true; paused = false; - + + // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); + //executor = new ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } // Start acceptor threads @@ -528,6 +533,7 @@ } // Start poller threads + log.info("Creating poller threads:"+pollerThreadCount); pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { pollers[i] = new Poller(); @@ -678,7 +684,8 @@ channel = new NioChannel(socket,bufhandler); } - getPoller().register(channel); + + getPoller0().register(channel); } catch (Throwable t) { if (log.isDebugEnabled()) { @@ -746,12 +753,13 @@ while (workerThread == null) { try { synchronized (workers) { - workers.wait(); + workerThread = createWorkerThread(); + if ( workerThread == null ) workers.wait(); } } catch (InterruptedException e) { // Ignore } - workerThread = createWorkerThread(); + if ( workerThread == null ) workerThread = createWorkerThread(); } return workerThread; } @@ -974,11 +982,13 @@ public void register(final NioChannel socket) { + socket.setPoller(this); + final KeyAttachment ka = new KeyAttachment(this); + ka.setChannel(socket); Runnable r = new Runnable() { public void run() { try { - KeyAttachment ka = new KeyAttachment(); - ka.setChannel(socket); + socket.getIOChannel().register(selector, SelectionKey.OP_READ, ka); } catch (Exception x) { log.error("", x); @@ -1027,6 +1037,14 @@ try { wakeupCounter.set(0); keyCount = selector.select(selectorTimeout); + } catch ( NullPointerException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( wakeupCounter == null || selector == null ) throw x; + continue; + } catch ( CancelledKeyException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( wakeupCounter == null || selector == null ) throw x; + continue; } catch (Throwable x) { log.error("",x); continue; @@ -1045,11 +1063,9 @@ iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { - if ( sk.isValid() ) { - if(attachment == null) attachment = new KeyAttachment(); + if ( sk.isValid() && attachment != null ) { attachment.access(); sk.attach(attachment); - int readyOps = sk.readyOps(); sk.interestOps(0); attachment.interestOps(0); NioChannel channel = attachment.getChannel(); @@ -1121,7 +1137,12 @@ } public static class KeyAttachment { - + + public KeyAttachment(Poller poller) { + this.poller = poller; + } + public Poller getPoller() { return poller;} + public void setPoller(Poller poller){this.poller = poller;} public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } @@ -1138,6 +1159,7 @@ public void setError(boolean error) { this.error = error; } public NioChannel getChannel() { return channel;} public void setChannel(NioChannel channel) { this.channel = channel;} + protected Poller poller = null; protected int interestOps = 0; public int interestOps() { return interestOps;} public int interestOps(int ops) { this.interestOps = ops; return ops; } @@ -1254,7 +1276,7 @@ NioChannel socket = await(); if (socket == null) continue; - SelectionKey key = socket.getIOChannel().keyFor(getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int handshake = -1; try { handshake = socket.handshake(key.isReadable(), key.isWritable()); @@ -1310,7 +1332,7 @@ } }; - getPoller().addEvent(r); + ka.getPoller().addEvent(r); } //dereference socket to let GC do its job socket = null; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]