Author: elecharny
Date: Thu Jul 30 08:22:29 2009
New Revision: 799199

URL: http://svn.apache.org/viewvc?rev=799199&view=rev
Log:
o Added a Logger
o Few Javadoc, renaming and comments added
o The updateTrafficMask method has been fixed : if there were one single 
session in the OPENING state, then the loop was exited, leaving some potential 
other sessions pending 
o Added a check for a broken session in the main loop of the processor, to 
avoid creating a new selector for nothing in this case

Modified:
    
mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
    
mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
    
mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java

Modified: 
mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=799199&r1=799198&r2=799199&view=diff
==============================================================================
--- 
mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 (original)
+++ 
mina/branches/select-fix/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 Thu Jul 30 08:22:29 2009
@@ -48,6 +48,8 @@
 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
 import org.apache.mina.util.ExceptionMonitor;
 import org.apache.mina.util.NamePreservingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An abstract implementation of {...@link IoProcessor} which helps
@@ -59,6 +61,9 @@
  */
 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
         implements IoProcessor<T> {
+    /** A logger for this class */
+    private final static Logger LOG = 
LoggerFactory.getLogger(IoProcessor.class);
+
     /**
      * The maximum loop count for a write operation until
      * {...@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero 
value.
@@ -271,7 +276,7 @@
      * @param session the session registered
      * @param interested true for registering, false for removing
      */
-    protected abstract void setInterestedInRead(T session, boolean interested)
+    protected abstract void setInterestedInRead(T session, boolean 
isInterested)
             throws Exception;
 
     /**
@@ -416,9 +421,22 @@
      * In the case we are using the java select() method, this method is
      * used to trash the buggy selector and create a new one, registring
      * all the sockets on it.
+     * @throws IOException If we got an exception
      */
     abstract protected void registerNewSelector() throws IOException;
 
+    
+    /**
+     * Check that the select() has not exited immediately just because of
+     * a broken connection. In this case, this is a standard case, and we
+     * just have to loop.
+     * 
+     * @return true if a connection has been brutally closed.
+     * @throws IOException If we got an exception
+     */
+    abstract protected boolean isBrokenConnection() throws IOException;
+
+    
     /**
      * Loops over the new sessions blocking queue and returns
      * the number of sessions which are effectively created
@@ -881,12 +899,19 @@
         filterChain.fireMessageSent(req);
     }
 
+    /**
+     * Update the trafficControl for all the session which has
+     * just been opened. 
+     */
     private void updateTrafficMask() {
-        for (;;) {
+        int queueSize = trafficControllingSessions.size();
+        
+        while (queueSize > 0) {
             T session = trafficControllingSessions.poll();
 
             if (session == null) {
-                break;
+                // We are done with this queue.
+                return;
             }
 
             SessionState state = getState(session);
@@ -903,26 +928,33 @@
                     // Retry later if session is not yet fully initialized.
                     // (In case that Session.suspend??() or session.resume??() 
is
                     // called before addSession() is processed)
-                    scheduleTrafficControl(session);
-                    return;
-                    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
+                    // We just put back the session at the end of the queue.
+                    trafficControllingSessions.add(session);
+                    break;
             }
+            
+            // As we have handled one session, decrement the number of 
+            // remaining sessions.
+            queueSize--;
         }
     }
 
+    /**
+     * Update the key's interest for READ and WRITE for this session.
+     */
     public void updateTrafficControl(T session) {
+        // 
         try {
             setInterestedInRead(session, !session.isReadSuspended());
         } catch (Exception e) {
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
         }
+        
         try {
-            setInterestedInWrite(session, !session.getWriteRequestQueue()
-                    .isEmpty(session)
-                    && !session.isWriteSuspended());
+            setInterestedInWrite(session, 
+                !session.getWriteRequestQueue().isEmpty(session) && 
+                !session.isWriteSuspended());
         } catch (Exception e) {
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
@@ -950,20 +982,27 @@
                         if (selected == 0) {
                             if ( ! wakeupCalled.get()) {
                                 if (delta < 100) {
-                                    System.out.println("Create a new selector. 
Selected is 0, delta = " + (t1 - t0));
-                                    // Ok, we are hit by the nasty epoll 
spinning.
-                                    // Basically, there is a race condition 
which cause 
-                                    // a closing file descriptor not to be 
considered as 
-                                    // available as a selected channel, but it 
stopped
-                                    // the select. The next time we will call 
select(),
-                                    // it will exit immediately for the same 
reason, 
-                                    // and do so forever, consuming 100% CPU.
-                                    // We have to destroy the selector, and 
register all 
-                                    // the socket on a new one.
-                                    registerNewSelector();
-                                    
+                                    // Last chance : the select() may have 
been interrupted
+                                    // because we have had an closed channel.
+                                    if ( isBrokenConnection() ) {
+                                        // we can reselect immediately
+                                        continue;
+                                    } else {
+                                        LOG.warn("Create a new selector. 
Selected is 0, delta = " + (t1 - t0));
+                                        // Ok, we are hit by the nasty epoll 
spinning.
+                                        // Basically, there is a race 
condition which cause 
+                                        // a closing file descriptor not to be 
considered as 
+                                        // available as a selected channel, 
but it stopped
+                                        // the select. The next time we will 
call select(),
+                                        // it will exit immediately for the 
same reason, 
+                                        // and do so forever, consuming 100% 
CPU.
+                                        // We have to destroy the selector, 
and register all 
+                                        // the socket on a new one.
+                                        registerNewSelector();
+                                    }
+                                        
                                     // and continue the loop
-                                    //continue;
+                                    continue;
                                 }
                             } else {
                                 //System.out.println("Waited one second");

Modified: 
mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=799199&r1=799198&r2=799199&view=diff
==============================================================================
--- 
mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
 (original)
+++ 
mina/branches/select-fix/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
 Thu Jul 30 08:22:29 2009
@@ -21,9 +21,11 @@
 
 import java.io.IOException;
 import java.nio.channels.ByteChannel;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -143,6 +145,40 @@
     /**
      * {...@inheritdoc}
      */
+    protected boolean isBrokenConnection() throws IOException {
+        // A flag set to true if we find a broken session
+        boolean brokenSession = false;
+        
+        synchronized (selector) {
+            // Get the selector keys
+            Set<SelectionKey> keys = selector.keys();
+            
+            // Loop on all the keys to see if one of them
+            // has a closed channel
+            for ( SelectionKey key:keys ) {
+                SelectableChannel channel = key.channel();
+                
+                if ((((channel instanceof DatagramChannel) &&
+                    ((DatagramChannel)channel).isConnected())) ||
+                    ((channel instanceof SocketChannel) &&
+                    ((SocketChannel)channel).isConnected())) {
+                    // The channel is not connected anymore. Cancel 
+                    // the associated key then.
+                    key.cancel();
+                    
+                    // Set the flag to true to avoid a selector switch
+                    brokenSession = true;
+                }
+            }
+        }
+        
+        return brokenSession;
+    }
+    
+    
+    /**
+     * {...@inheritdoc}
+     */
     @Override
     protected SessionState getState(NioSession session) {
         SelectionKey key = session.getSelectionKey();
@@ -186,15 +222,15 @@
     }
 
     @Override
-    protected void setInterestedInRead(NioSession session, boolean value) 
throws Exception {
+    protected void setInterestedInRead(NioSession session, boolean 
isInterested) throws Exception {
         SelectionKey key = session.getSelectionKey();
         int oldInterestOps = key.interestOps();
-        int newInterestOps;
+        int newInterestOps = oldInterestOps;
         
-        if (value) {
-            newInterestOps = oldInterestOps | SelectionKey.OP_READ;
+        if (!isInterested) {
+            newInterestOps &= ~SelectionKey.OP_READ;
         } else {
-            newInterestOps = oldInterestOps & ~SelectionKey.OP_READ;
+            newInterestOps |= SelectionKey.OP_READ;
         }
         
         if (oldInterestOps != newInterestOps) {
@@ -203,15 +239,17 @@
     }
 
     @Override
-    protected void setInterestedInWrite(NioSession session, boolean value) 
throws Exception {
+    protected void setInterestedInWrite(NioSession session, boolean 
isInterested) throws Exception {
         SelectionKey key = session.getSelectionKey();
         int oldInterestOps = key.interestOps();
-        int newInterestOps;
-        if (value) {
-            newInterestOps = oldInterestOps | SelectionKey.OP_WRITE;
+        int newInterestOps = oldInterestOps;
+        
+        if (isInterested) {
+            newInterestOps |= SelectionKey.OP_WRITE;
         } else {
-            newInterestOps = oldInterestOps & ~SelectionKey.OP_WRITE;
+            newInterestOps &= ~SelectionKey.OP_WRITE;
         }
+        
         if (oldInterestOps != newInterestOps) {
             key.interestOps(newInterestOps);
         }

Modified: 
mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=799199&r1=799198&r2=799199&view=diff
==============================================================================
--- 
mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
 (original)
+++ 
mina/branches/select-fix/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
 Thu Jul 30 08:22:29 2009
@@ -337,22 +337,24 @@
      * {...@inheritdoc}
      */
     @Override
-    protected void setInterestedInRead(AprSession session, boolean value) 
throws Exception {
-        if (session.isInterestedInRead() == value) {
+    protected void setInterestedInRead(AprSession session, boolean 
isInterested) throws Exception {
+        if (session.isInterestedInRead() == isInterested) {
             return;
         }
 
         int rv = Poll.remove(pollset, session.getDescriptor());
+        
         if (rv != Status.APR_SUCCESS) {
             throwException(rv);
         }
 
-        int flags = (value ? Poll.APR_POLLIN : 0)
+        int flags = (isInterested ? Poll.APR_POLLIN : 0)
                 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
 
         rv = Poll.add(pollset, session.getDescriptor(), flags);
+        
         if (rv == Status.APR_SUCCESS) {
-            session.setInterestedInRead(value);
+            session.setInterestedInRead(isInterested);
         } else {
             throwException(rv);
         }
@@ -362,22 +364,24 @@
      * {...@inheritdoc}
      */
     @Override
-    protected void setInterestedInWrite(AprSession session, boolean value) 
throws Exception {
-        if (session.isInterestedInWrite() == value) {
+    protected void setInterestedInWrite(AprSession session, boolean 
isInterested) throws Exception {
+        if (session.isInterestedInWrite() == isInterested) {
             return;
         }
 
         int rv = Poll.remove(pollset, session.getDescriptor());
+        
         if (rv != Status.APR_SUCCESS) {
             throwException(rv);
         }
 
         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
-                | (value ? Poll.APR_POLLOUT : 0);
+                | (isInterested ? Poll.APR_POLLOUT : 0);
 
         rv = Poll.add(pollset, session.getDescriptor(), flags);
+        
         if (rv == Status.APR_SUCCESS) {
-            session.setInterestedInWrite(value);
+            session.setInterestedInWrite(isInterested);
         } else {
             throwException(rv);
         }
@@ -459,11 +463,17 @@
     }
 
     /**
-     * In the case we are using the java select() method, this method is
-     * used to trash the buggy selector and create a new one, registring
-     * all the sockets on it.
+     * {...@inheritdoc}
      */
     protected void registerNewSelector() {
         // Do nothing
     }
+    
+    /**
+     * {...@inheritdoc}
+     */
+    protected boolean isBrokenConnection() throws IOException {
+        // Here, we assume that this is the case.
+        return true;
+    }
 }
\ No newline at end of file


Reply via email to