Author: elecharny Date: Thu Jul 30 08:22:29 2009 New Revision: 799199 URL: http://svn.apache.org/viewvc?rev=799199&view=rev Log: o Added a Logger o Few Javadoc, renaming and comments added o The updateTrafficMask method has been fixed : if there were one single session in the OPENING state, then the loop was exited, leaving some potential other sessions pending o Added a check for a broken session in the main loop of the processor, to avoid creating a new selector for nothing in this case
Modified: mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Modified: mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java URL: http://svn.apache.org/viewvc/mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=799199&r1=799198&r2=799199&view=diff ============================================================================== --- mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original) +++ mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Thu Jul 30 08:22:29 2009 @@ -48,6 +48,8 @@ import org.apache.mina.transport.socket.AbstractDatagramSessionConfig; import org.apache.mina.util.ExceptionMonitor; import org.apache.mina.util.NamePreservingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An abstract implementation of {...@link IoProcessor} which helps @@ -59,6 +61,9 @@ */ public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> { + /** A logger for this class */ + private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class); + /** * The maximum loop count for a write operation until * {...@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value. @@ -271,7 +276,7 @@ * @param session the session registered * @param interested true for registering, false for removing */ - protected abstract void setInterestedInRead(T session, boolean interested) + protected abstract void setInterestedInRead(T session, boolean isInterested) throws Exception; /** @@ -416,9 +421,22 @@ * In the case we are using the java select() method, this method is * used to trash the buggy selector and create a new one, registring * all the sockets on it. + * @throws IOException If we got an exception */ abstract protected void registerNewSelector() throws IOException; + + /** + * Check that the select() has not exited immediately just because of + * a broken connection. In this case, this is a standard case, and we + * just have to loop. + * + * @return true if a connection has been brutally closed. + * @throws IOException If we got an exception + */ + abstract protected boolean isBrokenConnection() throws IOException; + + /** * Loops over the new sessions blocking queue and returns * the number of sessions which are effectively created @@ -881,12 +899,19 @@ filterChain.fireMessageSent(req); } + /** + * Update the trafficControl for all the session which has + * just been opened. + */ private void updateTrafficMask() { - for (;;) { + int queueSize = trafficControllingSessions.size(); + + while (queueSize > 0) { T session = trafficControllingSessions.poll(); if (session == null) { - break; + // We are done with this queue. + return; } SessionState state = getState(session); @@ -903,26 +928,33 @@ // Retry later if session is not yet fully initialized. // (In case that Session.suspend??() or session.resume??() is // called before addSession() is processed) - scheduleTrafficControl(session); - return; - - default: - throw new IllegalStateException(String.valueOf(state)); + // We just put back the session at the end of the queue. + trafficControllingSessions.add(session); + break; } + + // As we have handled one session, decrement the number of + // remaining sessions. + queueSize--; } } + /** + * Update the key's interest for READ and WRITE for this session. + */ public void updateTrafficControl(T session) { + // try { setInterestedInRead(session, !session.isReadSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } + try { - setInterestedInWrite(session, !session.getWriteRequestQueue() - .isEmpty(session) - && !session.isWriteSuspended()); + setInterestedInWrite(session, + !session.getWriteRequestQueue().isEmpty(session) && + !session.isWriteSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); @@ -950,20 +982,27 @@ if (selected == 0) { if ( ! wakeupCalled.get()) { if (delta < 100) { - System.out.println("Create a new selector. Selected is 0, delta = " + (t1 - t0)); - // Ok, we are hit by the nasty epoll spinning. - // Basically, there is a race condition which cause - // a closing file descriptor not to be considered as - // available as a selected channel, but it stopped - // the select. The next time we will call select(), - // it will exit immediately for the same reason, - // and do so forever, consuming 100% CPU. - // We have to destroy the selector, and register all - // the socket on a new one. - registerNewSelector(); - + // Last chance : the select() may have been interrupted + // because we have had an closed channel. + if ( isBrokenConnection() ) { + // we can reselect immediately + continue; + } else { + LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); + // Ok, we are hit by the nasty epoll spinning. + // Basically, there is a race condition which cause + // a closing file descriptor not to be considered as + // available as a selected channel, but it stopped + // the select. The next time we will call select(), + // it will exit immediately for the same reason, + // and do so forever, consuming 100% CPU. + // We have to destroy the selector, and register all + // the socket on a new one. + registerNewSelector(); + } + // and continue the loop - //continue; + continue; } } else { //System.out.println("Waited one second"); Modified: mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java URL: http://svn.apache.org/viewvc/mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=799199&r1=799198&r2=799199&view=diff ============================================================================== --- mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original) +++ mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Thu Jul 30 08:22:29 2009 @@ -21,9 +21,11 @@ import java.io.IOException; import java.nio.channels.ByteChannel; +import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; @@ -143,6 +145,40 @@ /** * {...@inheritdoc} */ + protected boolean isBrokenConnection() throws IOException { + // A flag set to true if we find a broken session + boolean brokenSession = false; + + synchronized (selector) { + // Get the selector keys + Set<SelectionKey> keys = selector.keys(); + + // Loop on all the keys to see if one of them + // has a closed channel + for ( SelectionKey key:keys ) { + SelectableChannel channel = key.channel(); + + if ((((channel instanceof DatagramChannel) && + ((DatagramChannel)channel).isConnected())) || + ((channel instanceof SocketChannel) && + ((SocketChannel)channel).isConnected())) { + // The channel is not connected anymore. Cancel + // the associated key then. + key.cancel(); + + // Set the flag to true to avoid a selector switch + brokenSession = true; + } + } + } + + return brokenSession; + } + + + /** + * {...@inheritdoc} + */ @Override protected SessionState getState(NioSession session) { SelectionKey key = session.getSelectionKey(); @@ -186,15 +222,15 @@ } @Override - protected void setInterestedInRead(NioSession session, boolean value) throws Exception { + protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception { SelectionKey key = session.getSelectionKey(); int oldInterestOps = key.interestOps(); - int newInterestOps; + int newInterestOps = oldInterestOps; - if (value) { - newInterestOps = oldInterestOps | SelectionKey.OP_READ; + if (!isInterested) { + newInterestOps &= ~SelectionKey.OP_READ; } else { - newInterestOps = oldInterestOps & ~SelectionKey.OP_READ; + newInterestOps |= SelectionKey.OP_READ; } if (oldInterestOps != newInterestOps) { @@ -203,15 +239,17 @@ } @Override - protected void setInterestedInWrite(NioSession session, boolean value) throws Exception { + protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception { SelectionKey key = session.getSelectionKey(); int oldInterestOps = key.interestOps(); - int newInterestOps; - if (value) { - newInterestOps = oldInterestOps | SelectionKey.OP_WRITE; + int newInterestOps = oldInterestOps; + + if (isInterested) { + newInterestOps |= SelectionKey.OP_WRITE; } else { - newInterestOps = oldInterestOps & ~SelectionKey.OP_WRITE; + newInterestOps &= ~SelectionKey.OP_WRITE; } + if (oldInterestOps != newInterestOps) { key.interestOps(newInterestOps); } Modified: mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java URL: http://svn.apache.org/viewvc/mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=799199&r1=799198&r2=799199&view=diff ============================================================================== --- mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original) +++ mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Thu Jul 30 08:22:29 2009 @@ -337,22 +337,24 @@ * {...@inheritdoc} */ @Override - protected void setInterestedInRead(AprSession session, boolean value) throws Exception { - if (session.isInterestedInRead() == value) { + protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception { + if (session.isInterestedInRead() == isInterested) { return; } int rv = Poll.remove(pollset, session.getDescriptor()); + if (rv != Status.APR_SUCCESS) { throwException(rv); } - int flags = (value ? Poll.APR_POLLIN : 0) + int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0); rv = Poll.add(pollset, session.getDescriptor(), flags); + if (rv == Status.APR_SUCCESS) { - session.setInterestedInRead(value); + session.setInterestedInRead(isInterested); } else { throwException(rv); } @@ -362,22 +364,24 @@ * {...@inheritdoc} */ @Override - protected void setInterestedInWrite(AprSession session, boolean value) throws Exception { - if (session.isInterestedInWrite() == value) { + protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception { + if (session.isInterestedInWrite() == isInterested) { return; } int rv = Poll.remove(pollset, session.getDescriptor()); + if (rv != Status.APR_SUCCESS) { throwException(rv); } int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) - | (value ? Poll.APR_POLLOUT : 0); + | (isInterested ? Poll.APR_POLLOUT : 0); rv = Poll.add(pollset, session.getDescriptor(), flags); + if (rv == Status.APR_SUCCESS) { - session.setInterestedInWrite(value); + session.setInterestedInWrite(isInterested); } else { throwException(rv); } @@ -459,11 +463,17 @@ } /** - * In the case we are using the java select() method, this method is - * used to trash the buggy selector and create a new one, registring - * all the sockets on it. + * {...@inheritdoc} */ protected void registerNewSelector() { // Do nothing } + + /** + * {...@inheritdoc} + */ + protected boolean isBrokenConnection() throws IOException { + // Here, we assume that this is the case. + return true; + } } \ No newline at end of file