Author: trustin
Date: Sun Nov 4 04:22:44 2007
New Revision: 591770
URL: http://svn.apache.org/viewvc?rev=591770&view=rev
Log:
Resolved issue: DIRMINA-467 (IoFilter has to filter a setTrafficMask call.)
* Added IoFilter.filterSetTrafficMask
* ReadThrottleFilter now overrides filterSetTrafficMask handler
* Fixed a bug where ReadThrottlePolicy is not enforced correctly
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
Sun Nov 4 04:22:44 2007
@@ -97,7 +97,7 @@
private volatile boolean closing;
- private TrafficMask trafficMask = TrafficMask.ALL;
+ private volatile TrafficMask trafficMask = TrafficMask.ALL;
// Status variables
private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
@@ -321,13 +321,12 @@
if (trafficMask == null) {
throw new NullPointerException("trafficMask");
}
-
- if (this.trafficMask == trafficMask) {
- return;
- }
-
+
+ getFilterChain().fireFilterSetTrafficMask(trafficMask);
+ }
+
+ protected void setTrafficMaskNow(TrafficMask trafficMask) {
this.trafficMask = trafficMask;
- getProcessor().updateTrafficMask(this);
}
public void suspendRead() {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Sun Nov 4 04:22:44 2007
@@ -489,6 +489,19 @@
}
}
+ public void fireFilterSetTrafficMask(TrafficMask trafficMask) {
+ Entry tail = this.tail;
+ callPreviousFilterSetTrafficMask(tail, session, trafficMask);
+ }
+
+ private void callPreviousFilterSetTrafficMask(Entry entry, IoSession
session, TrafficMask trafficMask) {
+ try {
+ entry.getFilter().filterSetTrafficMask(entry.getNextFilter(),
session, trafficMask);
+ } catch (Throwable e) {
+ fireExceptionCaught(e);
+ }
+ }
+
public List<Entry> getAll() {
List<Entry> list = new ArrayList<Entry>();
EntryImpl e = head.nextEntry;
@@ -641,6 +654,15 @@
AbstractIoSession s = (AbstractIoSession) session;
s.getProcessor().remove(s);
}
+
+ @Override
+ public void filterSetTrafficMask(NextFilter nextFilter,
+ IoSession session, TrafficMask trafficMask) throws Exception {
+ AbstractIoSession s = (AbstractIoSession) session;
+ s.setTrafficMaskNow(trafficMask);
+ s.getProcessor().updateTrafficMask(session);
+ }
+
}
private static class TailFilter extends IoFilterAdapter {
@@ -788,6 +810,12 @@
public void filterClose(IoSession session) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterClose(nextEntry, session);
+ }
+
+ public void filterSetTrafficMask(IoSession session,
+ TrafficMask trafficMask) {
+ Entry nextEntry = EntryImpl.this.prevEntry;
+ callPreviousFilterSetTrafficMask(nextEntry, session,
trafficMask);
}
};
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoEvent.java Sun Nov
4 04:22:44 2007
@@ -69,6 +69,9 @@
case WRITE:
getSession().getFilterChain().fireFilterWrite((WriteRequest)
getParameter());
break;
+ case SET_TRAFFIC_MASK:
+
getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask)
getParameter());
+ break;
case CLOSE:
getSession().getFilterChain().fireFilterClose();
break;
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoEventType.java Sun
Nov 4 04:22:44 2007
@@ -28,5 +28,14 @@
* @version $Rev$, $Date$
*/
public enum IoEventType {
- SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED,
MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE
+ SESSION_CREATED,
+ SESSION_OPENED,
+ SESSION_CLOSED,
+ MESSAGE_RECEIVED,
+ MESSAGE_SENT,
+ SESSION_IDLE,
+ EXCEPTION_CAUGHT,
+ WRITE,
+ CLOSE,
+ SET_TRAFFIC_MASK,
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilter.java Sun Nov
4 04:22:44 2007
@@ -202,6 +202,12 @@
*/
void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception;
+
+ /**
+ * Filters [EMAIL PROTECTED] IoSession#setTrafficMask(TrafficMask)} method
invocation.
+ */
+ void filterSetTrafficMask(
+ NextFilter nextFilter, IoSession session, TrafficMask trafficMask)
throws Exception;
/**
* Represents the next [EMAIL PROTECTED] IoFilter} in [EMAIL PROTECTED]
IoFilterChain}.
@@ -251,5 +257,10 @@
* Forwards <tt>filterClose</tt> event to next filter.
*/
void filterClose(IoSession session);
+
+ /**
+ * Forwards <tt>filterSetTrafficMask</tt> event to next filter.
+ */
+ void filterSetTrafficMask(IoSession session, TrafficMask trafficMask);
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterAdapter.java
Sun Nov 4 04:22:44 2007
@@ -94,4 +94,9 @@
throws Exception {
nextFilter.filterClose(session);
}
+
+ public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+ TrafficMask trafficMask) throws Exception {
+ nextFilter.filterSetTrafficMask(session, trafficMask);
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java Sun
Nov 4 04:22:44 2007
@@ -276,6 +276,13 @@
public void fireFilterClose();
/**
+ * Fires a [EMAIL PROTECTED] IoSession#setTrafficMask(TrafficMask)} event.
Most users don't need to call this method at
+ * all. Please use this method only when you implement a new transport or
fire a virtual
+ * event.
+ */
+ public void fireFilterSetTrafficMask(TrafficMask trafficMask);
+
+ /**
* Represents a name-filter pair that an [EMAIL PROTECTED] IoFilterChain}
contains.
*
* @author The Apache MINA Project ([EMAIL PROTECTED])
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterEvent.java Sun
Nov 4 04:22:44 2007
@@ -59,6 +59,9 @@
case WRITE:
getNextFilter().filterWrite(getSession(), (WriteRequest)
getParameter());
break;
+ case SET_TRAFFIC_MASK:
+ getNextFilter().filterSetTrafficMask(getSession(), (TrafficMask)
getParameter());
+ break;
case CLOSE:
getNextFilter().filterClose(getSession());
break;
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
Sun Nov 4 04:22:44 2007
@@ -30,6 +30,7 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.TrafficMask;
import org.apache.mina.filter.executor.AbstractExecutorFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.CopyOnWriteMap;
@@ -119,7 +120,7 @@
}
private final Object logLock = new Object();
- private long lastLogTime = -1;
+ private long lastLogTime = 0;
private volatile ReadThrottlePolicy policy;
private final MessageSizeEstimator messageSizeEstimator;
@@ -324,6 +325,26 @@
nextFilter.messageReceived(session, message);
}
+ @Override
+ public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+ TrafficMask trafficMask) throws Exception {
+
+ if (trafficMask.isReadable()) {
+ State state = getState(session);
+ boolean suspendedRead;
+ synchronized (state) {
+ suspendedRead = state.suspendedRead;
+ }
+
+ // Suppress resumeRead() if read is suspended by this filter.
+ if (suspendedRead) {
+ trafficMask = trafficMask.and(TrafficMask.WRITE);
+ }
+ }
+
+ nextFilter.filterSetTrafficMask(session, trafficMask);
+ }
+
private class EnterFilter extends IoFilterAdapter {
@Override
public void onPreRemove(
@@ -362,33 +383,41 @@
ReadThrottlePolicy policy = getPolicy();
+ boolean enforcePolicy = false;
synchronized (state) {
int sessionBufferSize = (state.sessionBufferSize += size);
- switch (policy) {
- case BLOCK:
- case EXCEPTION:
- if ((maxSessionBufferSize != 0 && sessionBufferSize >=
maxSessionBufferSize) ||
- (maxServiceBufferSize != 0 && serviceBufferSize >=
maxServiceBufferSize) ||
- (maxGlobalBufferSize != 0 && globalBufferSize >=
maxGlobalBufferSize)) {
- session.suspendRead();
+ if ((maxSessionBufferSize != 0 && sessionBufferSize >=
maxSessionBufferSize) ||
+ (maxServiceBufferSize != 0 && serviceBufferSize >=
maxServiceBufferSize) ||
+ (maxGlobalBufferSize != 0 && globalBufferSize >=
maxGlobalBufferSize)) {
+ enforcePolicy = true;
+ switch (policy) {
+ case EXCEPTION:
+ case BLOCK:
state.suspendedRead = true;
}
}
}
- switch (policy) {
- case CLOSE:
- log(session);
- session.close();
- raiseException(session);
- break;
- case EXCEPTION:
- log(session);
- raiseException(session);
- break;
- case LOG:
- log(session);
- break;
+ if (enforcePolicy) {
+ switch (policy) {
+ case CLOSE:
+ log(session);
+ session.close();
+ raiseException(session);
+ break;
+ case EXCEPTION:
+ log(session);
+ session.suspendRead();
+ raiseException(session);
+ break;
+ case BLOCK:
+ log(session);
+ session.suspendRead();
+ break;
+ case LOG:
+ log(session);
+ break;
+ }
}
}
@@ -411,6 +440,7 @@
int maxServiceBufferSize = this.maxServiceBufferSize;
int maxSessionBufferSize = this.maxSessionBufferSize;
+ boolean enforcePolicy = false;
synchronized (state) {
int sessionBufferSize = (state.sessionBufferSize -= size);
if (sessionBufferSize < 0) {
@@ -422,9 +452,13 @@
(maxSessionBufferSize == 0 || sessionBufferSize <
maxSessionBufferSize) &&
(maxServiceBufferSize == 0 || serviceBufferSize <
maxServiceBufferSize) &&
(maxGlobalBufferSize == 0 || globalBufferSize <
maxGlobalBufferSize)) {
- session.resumeRead();
state.suspendedRead = false;
+ enforcePolicy = true;
}
+ }
+
+ if (enforcePolicy) {
+ session.resumeRead();
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
Sun Nov 4 04:22:44 2007
@@ -114,7 +114,7 @@
private final Object logLock = new Object();
private final Object blockLock = new Object();
- private long lastLogTime = -1;
+ private long lastLogTime = 0;
private int blockWaiters = 0;
private volatile WriteThrottlePolicy policy;
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/util/ReferenceCountingFilter.java
Sun Nov 4 04:22:44 2007
@@ -23,6 +23,7 @@
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TrafficMask;
import org.apache.mina.common.WriteRequest;
/**
@@ -124,5 +125,10 @@
public void sessionOpened(NextFilter nextFilter, IoSession session)
throws Exception {
filter.sessionOpened(nextFilter, session);
+ }
+
+ public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+ TrafficMask trafficMask) throws Exception {
+ filter.filterSetTrafficMask(nextFilter, session, trafficMask);
}
}
Modified:
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java?rev=591770&r1=591769&r2=591770&view=diff
==============================================================================
---
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
(original)
+++
mina/trunk/core/src/test/java/org/apache/mina/filter/executor/ExecutorFilterRegressionTest.java
Sun Nov 4 04:22:44 2007
@@ -27,8 +27,8 @@
import org.apache.mina.common.DummySession;
import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoEvent;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TrafficMask;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.IoFilter.NextFilter;
@@ -133,7 +133,7 @@
public void sessionCreated(IoSession session) {
}
- public void filter(IoEvent event) {
+ public void filterSetTrafficMask(IoSession session, TrafficMask
trafficMask) {
}
}