Author: ssmiweve
Date: 2008-11-30 22:30:59 +0100 (Sun, 30 Nov 2008)
New Revision: 7004
Modified:
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
Log:
an alternative implementation of Issue SKER4321: (Load reducing filter that
handles only current and last request from client)
no hard synchronisation on the HttpSession instance,
and using a non-synchronised and concurrent BlockingQueue and Semaphore pair
to ensure fair ordering and a limited queue.
Modified:
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
===================================================================
---
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
2008-11-30 18:38:28 UTC (rev 7003)
+++
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
2008-11-30 21:30:59 UTC (rev 7004)
@@ -30,9 +30,12 @@
import java.util.Enumeration;
import java.util.Locale;
import java.util.Properties;
-import java.util.Stack;
import java.util.UUID;
import java.text.MessageFormat;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -85,7 +88,8 @@
private static final String UNKNOWN = "unknown";
- private static final String USER_REQUEST_STACK = "userRequestStack";
+ private static final String USER_REQUEST_QUEUE = "userRequestQueue";
+ private static final String USER_REQUEST_SEMAPHORE =
"userRequestSemaphore";
private static final long WAIT_TIME = 5000;
private static final int REQUEST_QUEUE_SIZE = 5;
@@ -402,49 +406,69 @@
if (request instanceof HttpServletRequest) {
final HttpSession session = ((HttpServletRequest)
request).getSession();
- Stack<ServletRequest> stack;
- synchronized (session) {
- stack = (Stack<ServletRequest>)
session.getAttribute(USER_REQUEST_STACK);
- if (null == stack) {
- stack = new Stack<ServletRequest>();
- session.setAttribute(USER_REQUEST_STACK, stack);
- }
+ // fetch the user's queue
+ BlockingQueue<ServletRequest> queue
+ = (BlockingQueue<ServletRequest>)
session.getAttribute(USER_REQUEST_QUEUE);
+
+ // construct queue if necessary
+ if (null == queue) {
+ queue = new
ArrayBlockingQueue<ServletRequest>(REQUEST_QUEUE_SIZE);
+ session.setAttribute(USER_REQUEST_QUEUE, queue);
+ session.setAttribute(USER_REQUEST_SEMAPHORE, new Semaphore(1));
}
- if (stack.size() > REQUEST_QUEUE_SIZE) {
- if (response instanceof HttpServletResponse) {
- LOG.warn(" -- response 409 (More then " +
REQUEST_QUEUE_SIZE + " request in queue)");
- ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_CONFLICT);
- }
- } else {
+ // queue has a time limit. start counting.
+ final long start = System.currentTimeMillis();
- // requests added to the stack enter a wait loop here and are
thrown out after WAIT_TIME
- final long start = System.currentTimeMillis();
- stack.push(request);
- synchronized (session) {
- try {
- long timeLeft = WAIT_TIME -
(System.currentTimeMillis() - start);
- while (stack.peek() != request && timeLeft > 0) {
- try {
- session.wait(timeLeft);
- timeLeft = WAIT_TIME -
(System.currentTimeMillis() - start);
- } catch (InterruptedException e) {
- LOG.error(e);
- timeLeft = -1;
- }
- }
- if (timeLeft >= 0) {
- chain.doFilter(request, response);
- } else {
+ // attempt to join queue
+ if (queue.offer(request)) {
+
+ // a semphore is used for waiting within the queue. it lets us
sleep peacefully until front of queue.
+ final Semaphore semaphore = (Semaphore)
session.getAttribute(USER_REQUEST_SEMAPHORE);
+
+ try {
+ while(queue.peek() != request){
+
+ final long timeLeft = WAIT_TIME -
(System.currentTimeMillis() - start);
+
+ // let's sleep. sleeping too long results in 409
response.
+ if(!semaphore.tryAcquire(timeLeft,
TimeUnit.MILLISECONDS)){
+
LOG.warn(" -- response 409 (Timeout: Waited " +
(WAIT_TIME - timeLeft) + " ms. )");
((HttpServletResponse)
response).sendError(HttpServletResponse.SC_CONFLICT);
+ return; // response is set
+
+ }else if(queue.peek() != request){
+ // we've acquire the semaphore but we're not at
front of queue.
+ // mix up. and probably not possible in the jvm?
+ // release the semaphore and go back to sleep
+ semaphore.release();
+ LOG.error("acquired semaphore without being at
front of queue: " + queue);
}
- }finally {
- stack.pop();
- session.notifyAll();
}
+
+ // waiting is over. we can execute.
+ chain.doFilter(request, response);
+
+ }catch(InterruptedException ie){
+ LOG.error("Failed using session's semaphore", ie);
+
+ }finally {
+
+ // take out of queue first so (queue.peek() != request) is
true for the next request.
+ queue.remove();
+ // release the semaphore, waiting up the next request.
+ semaphore.release();
}
+
+ }else{
+ // the queue is too long to join. immediately return 409
response.
+ if (response instanceof HttpServletResponse) {
+ LOG.warn(" -- response 409 (More then " +
REQUEST_QUEUE_SIZE + " request in queue)");
+ ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_CONFLICT);
+ }
}
+
} else {
chain.doFilter(request, response);
}
_______________________________________________
Kernel-commits mailing list
[email protected]
http://sesat.no/mailman/listinfo/kernel-commits