Author: trustin
Date: Thu Nov 8 02:49:29 2007
New Revision: 593125
URL: http://svn.apache.org/viewvc?rev=593125&view=rev
Log:
Fixed a problem that ReadThrottleFilter doesn't resume a session
* resumeOthers() revives suspended sessions periodically even if
messageReceived event is not fired for the suspended sessions.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=593125&r1=593124&r2=593125&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
Thu Nov 8 02:49:29 2007
@@ -34,6 +34,7 @@
import org.apache.mina.filter.executor.AbstractExecutorFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.CopyOnWriteMap;
+import org.slf4j.Logger;
/**
* An [EMAIL PROTECTED] IoFilter} that throttles incoming traffic to
@@ -64,6 +65,9 @@
private static final Map<IoService, AtomicInteger> serviceBufferSizes =
new CopyOnWriteMap<IoService, AtomicInteger>();
+ private static final Object globalResumeLock = new Object();
+ private static long lastGlobalResumeTime = 0;
+
/**
* Returns the current amount of data in the buffer of the [EMAIL
PROTECTED] ExecuorFilter}
* for all [EMAIL PROTECTED] IoSession} whose [EMAIL PROTECTED]
IoFilterChain} has been configured by
@@ -104,9 +108,6 @@
private final AttributeKey STATE =
new AttributeKey(ReadThrottleFilter.class, "state");
- private final Object logLock = new Object();
- private long lastLogTime = 0;
-
private volatile ReadThrottlePolicy policy;
private final MessageSizeEstimator messageSizeEstimator;
@@ -365,7 +366,8 @@
private void enter(IoSession session, int size) {
State state = getState(session);
-
+ Logger logger = IoSessionLogger.getLogger(session, getClass());
+
int globalBufferSize =
ReadThrottleFilter.globalBufferSize.addAndGet(size);
int serviceBufferSize =
increaseServiceBufferSize(session.getService(), size);
@@ -376,8 +378,9 @@
ReadThrottlePolicy policy = getPolicy();
boolean enforcePolicy = false;
+ int sessionBufferSize;
synchronized (state) {
- int sessionBufferSize = (state.sessionBufferSize += size);
+ sessionBufferSize = (state.sessionBufferSize += size);
if ((maxSessionBufferSize != 0 && sessionBufferSize >=
maxSessionBufferSize) ||
(maxServiceBufferSize != 0 && serviceBufferSize >=
maxServiceBufferSize) ||
(maxGlobalBufferSize != 0 && globalBufferSize >=
maxGlobalBufferSize)) {
@@ -389,32 +392,43 @@
}
}
}
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Enter: " + sessionBufferSize);
+ }
if (enforcePolicy) {
switch (policy) {
case CLOSE:
- log(session);
+ log(session, state);
session.close();
raiseException(session);
break;
case EXCEPTION:
- log(session);
- session.suspendRead();
+ suspend(session, state, logger);
raiseException(session);
break;
case BLOCK:
- log(session);
- session.suspendRead();
+ suspend(session, state, logger);
break;
case LOG:
- log(session);
+ log(session, state);
break;
}
}
}
+
+ private void suspend(IoSession session, State state, Logger logger) {
+ log(session, state);
+ session.suspendRead();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Suspended: {}", getMessage(session));
+ }
+ }
private void exit(IoSession session, int size) {
State state = getState(session);
+ Logger logger = IoSessionLogger.getLogger(session, getClass());
int globalBufferSize =
ReadThrottleFilter.globalBufferSize.addAndGet(-size);
if (globalBufferSize < 0) {
@@ -432,35 +446,108 @@
int maxServiceBufferSize = this.maxServiceBufferSize;
int maxSessionBufferSize = this.maxSessionBufferSize;
+ int sessionBufferSize;
+
boolean enforcePolicy = false;
synchronized (state) {
- int sessionBufferSize = (state.sessionBufferSize -= size);
+ sessionBufferSize = (state.sessionBufferSize -= size);
if (sessionBufferSize < 0) {
- state.sessionBufferSize = 0;
+ state.sessionBufferSize = sessionBufferSize = 0;
throw new IllegalStateException("sessionBufferSize < 0");
}
-
- if ((maxSessionBufferSize == 0 || sessionBufferSize <
maxSessionBufferSize) &&
+ if (state.suspendedRead &&
+ (maxGlobalBufferSize == 0 || globalBufferSize <
maxGlobalBufferSize) &&
(maxServiceBufferSize == 0 || serviceBufferSize <
maxServiceBufferSize) &&
- (maxGlobalBufferSize == 0 || globalBufferSize <
maxGlobalBufferSize)) {
+ (maxSessionBufferSize == 0 || sessionBufferSize <
maxSessionBufferSize)) {
state.suspendedRead = false;
enforcePolicy = true;
}
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exit: {}", state.sessionBufferSize);
+ }
+
if (enforcePolicy) {
session.resumeRead();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resumed");
+ }
}
+
+ resumeOthers();
}
+
+ private void resumeOthers() {
+ long currentTime = System.currentTimeMillis();
+
+ // Try to resume other sessions every other second.
+ boolean resumeOthers;
+ synchronized (globalResumeLock) {
+ if (currentTime - lastGlobalResumeTime > 1000) {
+ lastGlobalResumeTime = currentTime;
+ resumeOthers = true;
+ } else {
+ resumeOthers = false;
+ }
+ }
+
+ if (resumeOthers) {
+ int maxGlobalBufferSize = this.maxGlobalBufferSize;
+ if (maxGlobalBufferSize == 0 || globalBufferSize.get() <
maxGlobalBufferSize) {
+ for (IoService service: serviceBufferSizes.keySet()) {
+ resumeService(service);
+ }
+ }
+
+ synchronized (globalResumeLock) {
+ lastGlobalResumeTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+ private void resumeService(IoService service) {
+ int maxServiceBufferSize = this.maxServiceBufferSize;
+ if (maxServiceBufferSize == 0 || getServiceBufferSize(service) <
maxServiceBufferSize) {
+ for (IoSession session: service.getManagedSessions()) {
+ resume(session);
+ }
+ }
+ }
+
+ private void resume(IoSession session) {
+ State state = (State) session.getAttribute(STATE);
+ if (state == null) {
+ return;
+ }
+
+ int maxSessionBufferSize = this.maxSessionBufferSize;
+ boolean resume = false;
+ synchronized (state) {
+ if (state.suspendedRead &&
+ (maxSessionBufferSize == 0 || state.sessionBufferSize <
maxSessionBufferSize)) {
+ state.suspendedRead = false;
+ resume = true;
+ }
+ }
- private void log(IoSession session) {
+ if (resume) {
+ session.resumeRead();
+ Logger logger = IoSessionLogger.getLogger(session, getClass());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resumed");
+ }
+ }
+ }
+
+ private void log(IoSession session, State state) {
long currentTime = System.currentTimeMillis();
// Prevent log flood by logging every 3 seconds.
boolean log;
- synchronized (logLock) {
- if (currentTime - lastLogTime > 3000) {
- lastLogTime = currentTime;
+ synchronized (state.logLock) {
+ if (currentTime - state.lastLogTime > 3000) {
+ state.lastLogTime = currentTime;
log = true;
} else {
log = false;
@@ -538,5 +625,8 @@
private static class State {
private int sessionBufferSize;
private boolean suspendedRead;
+
+ private final Object logLock = new Object();
+ private long lastLogTime = 0;
}
}