Author: ssmiweve
Date: 2008-11-30 22:30:59 +0100 (Sun, 30 Nov 2008)
New Revision: 7004

Modified:
   
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
Log:
an alternative implementation of Issue SKER4321:  (Load reducing filter that 
handles only current and last request from client) 
 no hard synchronisation on the HttpSession instance,
 and using a non-synchronised and concurrent BlockingQueue and Semaphore pair 
to ensure fair ordering and a limited queue.


Modified: 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
===================================================================
--- 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
 2008-11-30 18:38:28 UTC (rev 7003)
+++ 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
 2008-11-30 21:30:59 UTC (rev 7004)
@@ -30,9 +30,12 @@
 import java.util.Enumeration;
 import java.util.Locale;
 import java.util.Properties;
-import java.util.Stack;
 import java.util.UUID;
 import java.text.MessageFormat;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -85,7 +88,8 @@
 
     private static final String UNKNOWN = "unknown";
 
-    private static final String USER_REQUEST_STACK = "userRequestStack";
+    private static final String USER_REQUEST_QUEUE = "userRequestQueue";
+    private static final String USER_REQUEST_SEMAPHORE = 
"userRequestSemaphore";
     private static final long WAIT_TIME = 5000;
     private static final int REQUEST_QUEUE_SIZE = 5;
 
@@ -402,49 +406,69 @@
         if (request instanceof HttpServletRequest) {
             final HttpSession session = ((HttpServletRequest) 
request).getSession();
 
-            Stack<ServletRequest> stack;
-            synchronized (session) {
-                stack = (Stack<ServletRequest>) 
session.getAttribute(USER_REQUEST_STACK);
-                if (null == stack) {
-                    stack = new Stack<ServletRequest>();
-                    session.setAttribute(USER_REQUEST_STACK, stack);
-                }
+            // fetch the user's queue
+            BlockingQueue<ServletRequest> queue
+                    = (BlockingQueue<ServletRequest>) 
session.getAttribute(USER_REQUEST_QUEUE);
+
+            // construct queue if necessary
+            if (null == queue) {
+                queue = new 
ArrayBlockingQueue<ServletRequest>(REQUEST_QUEUE_SIZE);
+                session.setAttribute(USER_REQUEST_QUEUE, queue);
+                session.setAttribute(USER_REQUEST_SEMAPHORE, new Semaphore(1));
             }
 
-            if (stack.size() > REQUEST_QUEUE_SIZE) {
-                if (response instanceof HttpServletResponse) {
-                    LOG.warn(" -- response 409 (More then " + 
REQUEST_QUEUE_SIZE + " request in queue)");
-                    ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_CONFLICT);
-                }
-            } else {
+            // queue has a time limit. start counting.
+            final long start = System.currentTimeMillis();
 
-                // requests added to the stack enter a wait loop here and are 
thrown out after WAIT_TIME
-                final long start = System.currentTimeMillis();
-                stack.push(request);
-                synchronized (session) {
-                    try {
-                        long timeLeft = WAIT_TIME - 
(System.currentTimeMillis() - start);
-                        while (stack.peek() != request && timeLeft > 0) {
-                            try {
-                                session.wait(timeLeft);
-                                timeLeft = WAIT_TIME - 
(System.currentTimeMillis() - start);
-                            } catch (InterruptedException e) {
-                                LOG.error(e);
-                                timeLeft = -1;
-                            }
-                        }
-                        if (timeLeft >= 0) {
-                            chain.doFilter(request, response);
-                        } else {
+            // attempt to join queue
+            if (queue.offer(request)) {
+
+                // a semphore is used for waiting within the queue. it lets us 
sleep peacefully until front of queue.
+                final Semaphore semaphore = (Semaphore) 
session.getAttribute(USER_REQUEST_SEMAPHORE);
+
+                try {
+                    while(queue.peek() != request){
+
+                        final long timeLeft = WAIT_TIME - 
(System.currentTimeMillis() - start);
+
+                        // let's sleep. sleeping too long results in 409 
response.
+                        if(!semaphore.tryAcquire(timeLeft, 
TimeUnit.MILLISECONDS)){
+
                             LOG.warn(" -- response 409 (Timeout: Waited " + 
(WAIT_TIME - timeLeft) + " ms. )");
                             ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_CONFLICT);
+                            return; // response is set
+
+                        }else if(queue.peek() != request){
+                            // we've acquire the semaphore but we're not at 
front of queue.
+                            // mix up. and probably not possible in the jvm?
+                            // release the semaphore and go back to sleep
+                            semaphore.release();
+                            LOG.error("acquired semaphore without being at 
front of queue: " + queue);
                         }
-                    }finally {
-                        stack.pop();
-                        session.notifyAll();
                     }
+
+                    // waiting is over. we can execute.
+                    chain.doFilter(request, response);
+
+                }catch(InterruptedException ie){
+                    LOG.error("Failed using session's semaphore", ie);
+
+                }finally {
+
+                    // take out of queue first so (queue.peek() != request) is 
true for the next request.
+                    queue.remove();
+                    // release the semaphore, waiting up the next request.
+                    semaphore.release();
                 }
+
+            }else{
+                // the queue is too long to join. immediately return 409 
response.
+                if (response instanceof HttpServletResponse) {
+                    LOG.warn(" -- response 409 (More then " + 
REQUEST_QUEUE_SIZE + " request in queue)");
+                    ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_CONFLICT);
+                }
             }
+
         } else {
             chain.doFilter(request, response);
         }

_______________________________________________
Kernel-commits mailing list
[email protected]
http://sesat.no/mailman/listinfo/kernel-commits

Reply via email to