Author: ssmiweve
Date: 2008-12-01 14:06:26 +0100 (Mon, 01 Dec 2008)
New Revision: 7005
Modified:
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
Log:
Issue SKER4321: (Load reducing filter that handles only current and last
request from client)
second attempt. this time correctly LIFO. smaller methods. ReentrantLock
instead of Semaphore.
Modified:
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
===================================================================
---
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
2008-11-30 21:30:59 UTC (rev 7004)
+++
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
2008-12-01 13:06:26 UTC (rev 7005)
@@ -32,10 +32,15 @@
import java.util.Properties;
import java.util.UUID;
import java.text.MessageFormat;
+import java.util.Deque;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -89,7 +94,7 @@
private static final String UNKNOWN = "unknown";
private static final String USER_REQUEST_QUEUE = "userRequestQueue";
- private static final String USER_REQUEST_SEMAPHORE =
"userRequestSemaphore";
+ private static final String USER_REQUEST_LOCK = "userRequestSemaphore";
private static final long WAIT_TIME = 5000;
private static final int REQUEST_QUEUE_SIZE = 5;
@@ -397,81 +402,112 @@
// Private -------------------------------------------------------
- @SuppressWarnings("unchecked")
private static void doChainFilter(
final FilterChain chain,
final ServletRequest request,
final ServletResponse response) throws IOException,
ServletException {
if (request instanceof HttpServletRequest) {
- final HttpSession session = ((HttpServletRequest)
request).getSession();
+ doChainFilter(chain, (HttpServletRequest)request,
(HttpServletResponse)response);
+ } else {
+ chain.doFilter(request, response);
+ }
+ }
- // fetch the user's queue
- BlockingQueue<ServletRequest> queue
- = (BlockingQueue<ServletRequest>)
session.getAttribute(USER_REQUEST_QUEUE);
+ private static void doChainFilter(
+ final FilterChain chain,
+ final HttpServletRequest request,
+ final HttpServletResponse response) throws IOException,
ServletException {
- // construct queue if necessary
- if (null == queue) {
- queue = new
ArrayBlockingQueue<ServletRequest>(REQUEST_QUEUE_SIZE);
- session.setAttribute(USER_REQUEST_QUEUE, queue);
- session.setAttribute(USER_REQUEST_SEMAPHORE, new Semaphore(1));
- }
+ final HttpSession session = request.getSession();
- // queue has a time limit. start counting.
- final long start = System.currentTimeMillis();
+ // fetch the user's deque
+ final Deque<ServletRequest> deque = getUsersDeque(session);
- // attempt to join queue
- if (queue.offer(request)) {
+ // lock to execute
+ final ReentrantLock lock = (ReentrantLock)
session.getAttribute(USER_REQUEST_LOCK);
- // a semphore is used for waiting within the queue. it lets us
sleep peacefully until front of queue.
- final Semaphore semaphore = (Semaphore)
session.getAttribute(USER_REQUEST_SEMAPHORE);
+ // deque has a time limit. start counting.
+ long timeLeft = WAIT_TIME;
- try {
- while(queue.peek() != request){
+ // attempt to join deque
+ if (deque.offerFirst(request)) {
+ timeLeft = tryLock(request, deque, lock, timeLeft);
+ }
- final long timeLeft = WAIT_TIME -
(System.currentTimeMillis() - start);
+ if(lock.isHeldByCurrentThread()){
- // let's sleep. sleeping too long results in 409
response.
- if(!semaphore.tryAcquire(timeLeft,
TimeUnit.MILLISECONDS)){
+ try{
+ // waiting is over. and we can execute
+ chain.doFilter(request, response);
- LOG.warn(" -- response 409 (Timeout: Waited " +
(WAIT_TIME - timeLeft) + " ms. )");
- ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_CONFLICT);
- return; // response is set
+ }finally{
+ // take out of deque first
+ deque.remove(request);
- }else if(queue.peek() != request){
- // we've acquire the semaphore but we're not at
front of queue.
- // mix up. and probably not possible in the jvm?
- // release the semaphore and go back to sleep
- semaphore.release();
- LOG.error("acquired semaphore without being at
front of queue: " + queue);
- }
- }
+ // release the lock, waiting up the next request
+ lock.unlock();
+ }
+ }else{
+ // we failed to execute. return 409 response.
+ if (response instanceof HttpServletResponse) {
- // waiting is over. we can execute.
- chain.doFilter(request, response);
+ LOG.warn(" -- response 409 " +
+ (0 < timeLeft
+ ? "(More then " + REQUEST_QUEUE_SIZE + " requests
already in queue)"
+ : "(Timeout: Waited " + WAIT_TIME + " ms)"));
- }catch(InterruptedException ie){
- LOG.error("Failed using session's semaphore", ie);
+ response.sendError(HttpServletResponse.SC_CONFLICT);
+ }
+ }
+ }
- }finally {
+ private static Deque<ServletRequest> getUsersDeque(final HttpSession
session){
- // take out of queue first so (queue.peek() != request) is
true for the next request.
- queue.remove();
- // release the semaphore, waiting up the next request.
- semaphore.release();
- }
+ @SuppressWarnings("unchecked")
+ Deque<ServletRequest> deque = (BlockingDeque<ServletRequest>)
session.getAttribute(USER_REQUEST_QUEUE);
- }else{
- // the queue is too long to join. immediately return 409
response.
- if (response instanceof HttpServletResponse) {
- LOG.warn(" -- response 409 (More then " +
REQUEST_QUEUE_SIZE + " request in queue)");
- ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_CONFLICT);
+ // construct deque if necessary
+ if (null == deque) {
+ // it may be possible for duplicates across threads to be
constructed here
+ deque = new
LinkedBlockingDeque<ServletRequest>(REQUEST_QUEUE_SIZE);
+ session.setAttribute(USER_REQUEST_QUEUE, deque);
+ session.setAttribute(USER_REQUEST_LOCK, new ReentrantLock());
+ }
+
+ return deque;
+ }
+
+ private static long tryLock(
+ final HttpServletRequest request,
+ final Deque<ServletRequest> deque,
+ final Lock lock,
+ long timeLeft){
+
+ final long start = System.currentTimeMillis();
+
+ try {
+ do{
+ timeLeft = WAIT_TIME - (System.currentTimeMillis() - start);
+
+ // let's sleep. sleeping too long results in 409 response
+ if(0 >= timeLeft || !lock.tryLock(timeLeft,
TimeUnit.MILLISECONDS)){
+ // we timed out or got the lock. waiting is over
+ break;
+
+ }else if(deque.peek() != request){
+ // we've acquired the lock but we're not at front of deque
+ // release the lock and try again
+ lock.unlock();
}
- }
+ }while(deque.peek() != request);
- } else {
- chain.doFilter(request, response);
+
+ }catch(InterruptedException ie){
+ LOG.error("Failed using user's lock", ie);
}
+
+ return timeLeft;
}
private void doBeforeProcessing(final ServletRequest request, final
ServletResponse response)
_______________________________________________
Kernel-commits mailing list
[email protected]
http://sesat.no/mailman/listinfo/kernel-commits