Author: elecharny Date: Wed Jul 29 18:31:49 2009 New Revision: 799001 URL: http://svn.apache.org/viewvc?rev=799001&view=rev Log: o Created a real Enum file for the sessionState o Added some Javadoc and comments o Changed the name of the state() method to getState() for clarity sake o Renamed remove() to removeSessions() in the AbtractPollingIoProcessor class, for clarity sake
Added: mina/trunk/core/src/main/java/org/apache/mina/core/session/SessionState.java Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=799001&r1=799000&r2=799001&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Wed Jul 29 18:31:49 2009 @@ -40,6 +40,7 @@ import org.apache.mina.core.session.AbstractIoSession; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSessionConfig; +import org.apache.mina.core.session.SessionState; import org.apache.mina.core.write.WriteRequest; import org.apache.mina.core.write.WriteRequestQueue; import org.apache.mina.core.write.WriteToClosedSessionException; @@ -80,10 +81,13 @@ /** A Session queue containing the newly created sessions */ private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>(); + /** A queue used to store the sessions to be removed */ private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>(); + /** A queue used to store the sessions to be flushed */ private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>(); + /** A queue used to store the sessions which have a trafficControl to be updated */ private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>(); /** The processor thread : it handles the incoming messages */ @@ -235,7 +239,7 @@ * @param session the {...@link IoSession} to inspect * @return the state of the session */ - protected abstract SessionState state(T session); + protected abstract SessionState getState(T session); /** * Is the session ready for writing @@ -431,9 +435,9 @@ } private boolean addNow(T session) { - boolean registered = false; boolean notified = false; + try { init(session); registered = true; @@ -469,39 +473,46 @@ return registered; } - private int remove() { + private int removeSessions() { int removedSessions = 0; + for (;;) { T session = removingSessions.poll(); if (session == null) { - break; + // No session to remove. Get out. + return removedSessions; } - SessionState state = state(session); + SessionState state = getState(session); + switch (state) { - case OPEN: - if (removeNow(session)) { - removedSessions++; - } - break; - case CLOSED: - // Skip if channel is already closed - break; - case PREPARING: - // Remove session from the newSessions queue and - // remove it - newSessions.remove(session); - if (removeNow(session)) { - removedSessions++; - } - break; - default: - throw new IllegalStateException(String.valueOf(state)); + case OPENED: + if (removeNow(session)) { + removedSessions++; + } + + break; + + case CLOSING: + // Skip if channel is already closed + break; + + case OPENING: + // Remove session from the newSessions queue and + // remove it + newSessions.remove(session); + + if (removeNow(session)) { + removedSessions++; + } + + break; + + default: + throw new IllegalStateException(String.valueOf(state)); } } - - return removedSessions; } private boolean removeNow(T session) { @@ -662,35 +673,40 @@ } T session = flushingSessions.poll(); // the same one with firstSession + for (;;) { session.setScheduledForFlush(false); - SessionState state = state(session); + SessionState state = getState(session); switch (state) { - case OPEN: - try { - boolean flushedAll = flushNow(session, currentTime); - if (flushedAll - && !session.getWriteRequestQueue().isEmpty(session) - && !session.isScheduledForFlush()) { - scheduleFlush(session); + case OPENED: + try { + boolean flushedAll = flushNow(session, currentTime); + if (flushedAll + && !session.getWriteRequestQueue().isEmpty(session) + && !session.isScheduledForFlush()) { + scheduleFlush(session); + } + } catch (Exception e) { + scheduleRemove(session); + IoFilterChain filterChain = session.getFilterChain(); + filterChain.fireExceptionCaught(e); } - } catch (Exception e) { - scheduleRemove(session); - IoFilterChain filterChain = session.getFilterChain(); - filterChain.fireExceptionCaught(e); - } - break; - case CLOSED: - // Skip if the channel is already closed. - break; - case PREPARING: - // Retry later if session is not yet fully initialized. - // (In case that Session.write() is called before addSession() is processed) - scheduleFlush(session); - return; - default: - throw new IllegalStateException(String.valueOf(state)); + + break; + + case CLOSING: + // Skip if the channel is already closed. + break; + + case OPENING: + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + scheduleFlush(session); + return; + + default: + throw new IllegalStateException(String.valueOf(state)); } session = flushingSessions.peek(); @@ -867,21 +883,25 @@ break; } - SessionState state = state(session); + SessionState state = getState(session); + switch (state) { - case OPEN: - updateTrafficControl(session); - break; - case CLOSED: - break; - case PREPARING: - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - scheduleTrafficControl(session); - return; - default: - throw new IllegalStateException(String.valueOf(state)); + case OPENED: + updateTrafficControl(session); + break; + + case CLOSING: + break; + + case OPENING: + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + scheduleTrafficControl(session); + return; + + default: + throw new IllegalStateException(String.valueOf(state)); } } } @@ -927,7 +947,7 @@ long currentTime = System.currentTimeMillis(); flush(currentTime); - nSessions -= remove(); + nSessions -= removeSessions(); notifyIdleSessions(currentTime); if (nSessions == 0) { @@ -971,8 +991,4 @@ } } } - - protected static enum SessionState { - OPEN, CLOSED, PREPARING, - } } Added: mina/trunk/core/src/main/java/org/apache/mina/core/session/SessionState.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/session/SessionState.java?rev=799001&view=auto ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/session/SessionState.java (added) +++ mina/trunk/core/src/main/java/org/apache/mina/core/session/SessionState.java Wed Jul 29 18:31:49 2009 @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.core.session; + +/** + * The session state. A session can be in three different state : + * <ul> + * <li>OPENING : The session has not been fully created</li> + * <li>OPENED : The session is opened</li> + * <li>CLOSING : The session is closing</li> + * </ul> + * + * @author <a href="http://mina.apache.org">Apache MINA Project</a> + */ +public enum SessionState +{ + OPENING, + OPENED, + CLOSING +} \ No newline at end of file Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=799001&r1=799000&r2=799001&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Wed Jul 29 18:31:49 2009 @@ -32,6 +32,7 @@ import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.file.FileRegion; import org.apache.mina.core.polling.AbstractPollingIoProcessor; +import org.apache.mina.core.session.SessionState; /** * TODO Add documentation @@ -110,14 +111,25 @@ ch.close(); } + /** + * {...@inheritdoc} + */ @Override - protected SessionState state(NioSession session) { + protected SessionState getState(NioSession session) { SelectionKey key = session.getSelectionKey(); + if (key == null) { - return SessionState.PREPARING; + // The channel is not yet registered to a selector + return SessionState.OPENING; } - return key.isValid()? SessionState.OPEN : SessionState.CLOSED; + if (key.isValid()) { + // The session is opened + return SessionState.OPENED; + } else { + // The session still as to be closed + return SessionState.CLOSING; + } } @Override @@ -164,11 +176,13 @@ SelectionKey key = session.getSelectionKey(); int oldInterestOps = key.interestOps(); int newInterestOps; + if (value) { newInterestOps = oldInterestOps | SelectionKey.OP_WRITE; } else { newInterestOps = oldInterestOps & ~SelectionKey.OP_WRITE; } + if (oldInterestOps != newInterestOps) { key.interestOps(newInterestOps); } Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=799001&r1=799000&r2=799001&view=diff ============================================================================== --- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original) +++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Wed Jul 29 18:31:49 2009 @@ -31,6 +31,7 @@ import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.file.FileRegion; import org.apache.mina.core.polling.AbstractPollingIoProcessor; +import org.apache.mina.core.session.SessionState; import org.apache.mina.util.CircularQueue; import org.apache.tomcat.jni.Poll; import org.apache.tomcat.jni.Pool; @@ -288,14 +289,15 @@ * {...@inheritdoc} */ @Override - protected SessionState state(AprSession session) { + protected SessionState getState(AprSession session) { long socket = session.getDescriptor(); + if (socket != 0) { - return SessionState.OPEN; + return SessionState.OPENED; } else if (allSessions.get(socket) != null) { - return SessionState.PREPARING; // will occur ? + return SessionState.OPENING; // will occur ? } else { - return SessionState.CLOSED; + return SessionState.CLOSING; } }