Author: trustin
Date: Sun Nov  4 04:22:44 2007
New Revision: 591770

URL: http://svn.apache.org/viewvc?rev=591770&view=rev
Log:
Resolved issue: DIRMINA-467 (IoFilter has to filter a setTrafficMask call.)
* Added IoFilter.filterSetTrafficMask
* ReadThrottleFilter now overrides filterSetTrafficMask handler
* Fixed a bug where ReadThrottlePolicy is not enforced correctly


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
    
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
Sun Nov  4 04:22:44 2007
@@ -97,7 +97,7 @@
 
     private volatile boolean closing;
 
-    private TrafficMask trafficMask = TrafficMask.ALL;
+    private volatile TrafficMask trafficMask = TrafficMask.ALL;
 
     // Status variables
     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
@@ -321,13 +321,12 @@
         if (trafficMask == null) {
             throw new NullPointerException("trafficMask");
         }
-
-        if (this.trafficMask == trafficMask) {
-            return;
-        }
-
+        
+        getFilterChain().fireFilterSetTrafficMask(trafficMask);
+    }
+    
+    protected void setTrafficMaskNow(TrafficMask trafficMask) {
         this.trafficMask = trafficMask;
-        getProcessor().updateTrafficMask(this);
     }
 
     public void suspendRead() {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
Sun Nov  4 04:22:44 2007
@@ -489,6 +489,19 @@
         }
     }
 
+    public void fireFilterSetTrafficMask(TrafficMask trafficMask) {
+        Entry tail = this.tail;
+        callPreviousFilterSetTrafficMask(tail, session, trafficMask);
+    }
+
+    private void callPreviousFilterSetTrafficMask(Entry entry, IoSession 
session, TrafficMask trafficMask) {
+        try {
+            entry.getFilter().filterSetTrafficMask(entry.getNextFilter(), 
session, trafficMask);
+        } catch (Throwable e) {
+            fireExceptionCaught(e);
+        }
+    }
+    
     public List<Entry> getAll() {
         List<Entry> list = new ArrayList<Entry>();
         EntryImpl e = head.nextEntry;
@@ -641,6 +654,15 @@
             AbstractIoSession s = (AbstractIoSession) session;
             s.getProcessor().remove(s);
         }
+
+        @Override
+        public void filterSetTrafficMask(NextFilter nextFilter,
+                IoSession session, TrafficMask trafficMask) throws Exception {
+            AbstractIoSession s = (AbstractIoSession) session;
+            s.setTrafficMaskNow(trafficMask);
+            s.getProcessor().updateTrafficMask(session);
+        }
+        
     }
 
     private static class TailFilter extends IoFilterAdapter {
@@ -788,6 +810,12 @@
                 public void filterClose(IoSession session) {
                     Entry nextEntry = EntryImpl.this.prevEntry;
                     callPreviousFilterClose(nextEntry, session);
+                }
+
+                public void filterSetTrafficMask(IoSession session,
+                        TrafficMask trafficMask) {
+                    Entry nextEntry = EntryImpl.this.prevEntry;
+                    callPreviousFilterSetTrafficMask(nextEntry, session, 
trafficMask);
                 }
             };
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java Sun Nov  
4 04:22:44 2007
@@ -69,6 +69,9 @@
         case WRITE:
             getSession().getFilterChain().fireFilterWrite((WriteRequest) 
getParameter());
             break;
+        case SET_TRAFFIC_MASK:
+            
getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask) 
getParameter());
+            break;
         case CLOSE:
             getSession().getFilterChain().fireFilterClose();
             break;

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java Sun 
Nov  4 04:22:44 2007
@@ -28,5 +28,14 @@
  * @version $Rev$, $Date$
  */
 public enum IoEventType {
-    SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, 
MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE
+    SESSION_CREATED,
+    SESSION_OPENED,
+    SESSION_CLOSED,
+    MESSAGE_RECEIVED,
+    MESSAGE_SENT,
+    SESSION_IDLE,
+    EXCEPTION_CAUGHT,
+    WRITE,
+    CLOSE,
+    SET_TRAFFIC_MASK,
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java Sun Nov  
4 04:22:44 2007
@@ -202,6 +202,12 @@
      */
     void filterWrite(NextFilter nextFilter, IoSession session,
             WriteRequest writeRequest) throws Exception;
+    
+    /**
+     * Filters [EMAIL PROTECTED] IoSession#setTrafficMask(TrafficMask)} method 
invocation.
+     */
+    void filterSetTrafficMask(
+            NextFilter nextFilter, IoSession session, TrafficMask trafficMask) 
throws Exception;
 
     /**
      * Represents the next [EMAIL PROTECTED] IoFilter} in [EMAIL PROTECTED] 
IoFilterChain}.
@@ -251,5 +257,10 @@
          * Forwards <tt>filterClose</tt> event to next filter.
          */
         void filterClose(IoSession session);
+        
+        /**
+         * Forwards <tt>filterSetTrafficMask</tt> event to next filter.
+         */
+        void filterSetTrafficMask(IoSession session, TrafficMask trafficMask);
     }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java 
Sun Nov  4 04:22:44 2007
@@ -94,4 +94,9 @@
             throws Exception {
         nextFilter.filterClose(session);
     }
+
+    public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+            TrafficMask trafficMask) throws Exception {
+        nextFilter.filterSetTrafficMask(session, trafficMask);
+    }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java Sun 
Nov  4 04:22:44 2007
@@ -276,6 +276,13 @@
     public void fireFilterClose();
 
     /**
+     * Fires a [EMAIL PROTECTED] IoSession#setTrafficMask(TrafficMask)} event. 
 Most users don't need to call this method at
+     * all.  Please use this method only when you implement a new transport or 
fire a virtual
+     * event.
+     */
+    public void fireFilterSetTrafficMask(TrafficMask trafficMask);
+
+    /**
      * Represents a name-filter pair that an [EMAIL PROTECTED] IoFilterChain} 
contains.
      *
      * @author The Apache MINA Project ([EMAIL PROTECTED])

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java Sun 
Nov  4 04:22:44 2007
@@ -59,6 +59,9 @@
         case WRITE:
             getNextFilter().filterWrite(getSession(), (WriteRequest) 
getParameter());
             break;
+        case SET_TRAFFIC_MASK:
+            getNextFilter().filterSetTrafficMask(getSession(), (TrafficMask) 
getParameter());
+            break;
         case CLOSE:
             getNextFilter().filterClose(getSession());
             break;

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
 Sun Nov  4 04:22:44 2007
@@ -30,6 +30,7 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.filter.executor.AbstractExecutorFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.util.CopyOnWriteMap;
@@ -119,7 +120,7 @@
     }
     
     private final Object logLock = new Object();
-    private long lastLogTime = -1;
+    private long lastLogTime = 0;
     
     private volatile ReadThrottlePolicy policy;
     private final MessageSizeEstimator messageSizeEstimator;
@@ -324,6 +325,26 @@
         nextFilter.messageReceived(session, message);
     }
 
+    @Override
+    public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+            TrafficMask trafficMask) throws Exception {
+        
+        if (trafficMask.isReadable()) {
+            State state = getState(session);
+            boolean suspendedRead;
+            synchronized (state) {
+                suspendedRead = state.suspendedRead;
+            }
+            
+            // Suppress resumeRead() if read is suspended by this filter.
+            if (suspendedRead) {
+                trafficMask = trafficMask.and(TrafficMask.WRITE);
+            }
+        }
+        
+        nextFilter.filterSetTrafficMask(session, trafficMask);
+    }
+
     private class EnterFilter extends IoFilterAdapter {
         @Override
         public void onPreRemove(
@@ -362,33 +383,41 @@
         
         ReadThrottlePolicy policy = getPolicy();
         
+        boolean enforcePolicy = false;
         synchronized (state) {
             int sessionBufferSize = (state.sessionBufferSize += size);
-            switch (policy) {
-            case BLOCK:
-            case EXCEPTION:
-                if ((maxSessionBufferSize != 0 && sessionBufferSize >= 
maxSessionBufferSize) ||
-                    (maxServiceBufferSize != 0 && serviceBufferSize >= 
maxServiceBufferSize) ||
-                    (maxGlobalBufferSize  != 0 && globalBufferSize  >= 
maxGlobalBufferSize)) {
-                    session.suspendRead();
+            if ((maxSessionBufferSize != 0 && sessionBufferSize >= 
maxSessionBufferSize) ||
+                (maxServiceBufferSize != 0 && serviceBufferSize >= 
maxServiceBufferSize) ||
+                (maxGlobalBufferSize  != 0 && globalBufferSize  >= 
maxGlobalBufferSize)) {
+                enforcePolicy = true;
+                switch (policy) {
+                case EXCEPTION:
+                case BLOCK:
                     state.suspendedRead = true;
                 }
             }
         }
         
-        switch (policy) {
-        case CLOSE:
-            log(session);
-            session.close();
-            raiseException(session);
-            break;
-        case EXCEPTION:
-            log(session);
-            raiseException(session);
-            break;
-        case LOG:
-            log(session);
-            break;
+        if (enforcePolicy) {
+            switch (policy) {
+            case CLOSE:
+                log(session);
+                session.close();
+                raiseException(session);
+                break;
+            case EXCEPTION:
+                log(session);
+                session.suspendRead();
+                raiseException(session);
+                break;
+            case BLOCK:
+                log(session);
+                session.suspendRead();
+                break;
+            case LOG:
+                log(session);
+                break;
+            }
         }
     }
     
@@ -411,6 +440,7 @@
         int maxServiceBufferSize = this.maxServiceBufferSize;
         int maxSessionBufferSize = this.maxSessionBufferSize;
         
+        boolean enforcePolicy = false;
         synchronized (state) {
             int sessionBufferSize = (state.sessionBufferSize -= size);
             if (sessionBufferSize < 0) {
@@ -422,9 +452,13 @@
                 (maxSessionBufferSize == 0 || sessionBufferSize < 
maxSessionBufferSize) &&
                 (maxServiceBufferSize == 0 || serviceBufferSize < 
maxServiceBufferSize) &&
                 (maxGlobalBufferSize  == 0 || globalBufferSize  < 
maxGlobalBufferSize)) {
-                session.resumeRead();
                 state.suspendedRead = false;
+                enforcePolicy = true;
             }
+        }
+        
+        if (enforcePolicy) {
+            session.resumeRead();
         }
     }
 

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
 Sun Nov  4 04:22:44 2007
@@ -114,7 +114,7 @@
     private final Object logLock = new Object();
     private final Object blockLock = new Object();
 
-    private long lastLogTime = -1;
+    private long lastLogTime = 0;
     private int blockWaiters = 0;
     
     private volatile WriteThrottlePolicy policy;

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
 Sun Nov  4 04:22:44 2007
@@ -23,6 +23,7 @@
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.WriteRequest;
 
 /**
@@ -124,5 +125,10 @@
     public void sessionOpened(NextFilter nextFilter, IoSession session)
             throws Exception {
         filter.sessionOpened(nextFilter, session);
+    }
+
+    public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+            TrafficMask trafficMask) throws Exception {
+        filter.filterSetTrafficMask(nextFilter, session, trafficMask);
     }
 }

Modified: 
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- 
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
 (original)
+++ 
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
 Sun Nov  4 04:22:44 2007
@@ -27,8 +27,8 @@
 
 import org.apache.mina.common.DummySession;
 import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoEvent;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.IoFilter.NextFilter;
 
@@ -133,7 +133,7 @@
         public void sessionCreated(IoSession session) {
         }
 
-        public void filter(IoEvent event) {
+        public void filterSetTrafficMask(IoSession session, TrafficMask 
trafficMask) {
         }
     }
 


Reply via email to