Author: elecharny Date: Mon Mar 9 12:26:41 2009 New Revision: 751661 URL: http://svn.apache.org/viewvc?rev=751661&view=rev Log: o Added some Javadoc and comments o Renamed the add() method to handleNewSession() o Minor refactoring
Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=751661&r1=751660&r2=751661&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Mon Mar 9 12:26:41 2009 @@ -162,7 +162,7 @@ synchronized (disposalLock) { if (!disposing) { disposing = true; - startupWorker(); + startupProcessor(); } } @@ -219,7 +219,7 @@ protected abstract Iterator<T> selectedSessions(); /** - * Get the sate of a session (preparing, open, closed) + * Get the state of a session (preparing, open, closed) * @param session the {...@link IoSession} to inspect * @return the state of the session */ @@ -327,7 +327,7 @@ // Adds the session to the newSession queue and starts the worker newSessions.add(session); - startupWorker(); + startupProcessor(); } /** @@ -335,7 +335,7 @@ */ public final void remove(T session) { scheduleRemove(session); - startupWorker(); + startupProcessor(); } private void scheduleRemove(T session) { @@ -372,17 +372,28 @@ trafficControllingSessions.add(session); } - private void startupWorker() { + /** + * Starts the inner Processor, asking the executor to pick a thread in its + * pool. The Runnable will be renamed + */ + private void startupProcessor() { synchronized (lock) { if (processor == null) { processor = new Processor(); executor.execute(new NamePreservingRunnable(processor, threadName)); } } + + // Just stop the select() and start it again, so that the processor + // can be activated immediately. wakeup(); } - private int add() { + /** + * Handle newly created sessions + * @return The number of new sessions + */ + private int handleNewSessions() { int addedSessions = 0; // Loop on the new sessions blocking queue, to count @@ -391,13 +402,13 @@ T session = newSessions.poll(); if (session == null) { - // We don't have anymore new sessions + // We don't have new sessions break; } if (addNow(session)) { - // The new session has been added to the + // A new session has been created addedSessions ++; } } @@ -541,12 +552,16 @@ } } + /** + * Deal with session ready for the read or write operations, or both. + */ private void process(T session) { - + // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } + // Process writes if (isWritable(session) && !session.isWriteSuspended()) { scheduleFlush(session); } @@ -624,6 +639,7 @@ for (; ;) { session.setScheduledForFlush(false); SessionState state = state(session); + switch (state) { case OPEN: try { @@ -858,11 +874,14 @@ for (;;) { try { + // TODO: Why do we use a timeout here ??? int selected = select(1000); - nSessions += add(); + nSessions += handleNewSessions(); updateTrafficMask(); + // Now, if we have had some incoming or outgoing events, + // deal with them if (selected > 0) { process(); }