Author: asankha
Date: Fri Apr 27 13:32:35 2007
New Revision: 533211

URL: http://svn.apache.org/viewvc?view=rev&rev=533211
Log:
tune the concurrent worker pools to get better performance
fix possible concurrent modification exception

Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java?view=diff&rev=533211&r1=533210&r2=533211
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/TimeoutHandler.java
 Fri Apr 27 13:32:35 2007
@@ -44,7 +44,11 @@
  */
 public class TimeoutHandler extends TimerTask {
 
+    /** The callback map - already a Collections.synchronized() hash map */
     private Map callbackStore = null;
+    /** a lock to prevent concurrent execution while ensuring least overhead */
+    private Object lock = new Object();
+    private boolean alreadyExecuting = false;
 
     public TimeoutHandler(Map callbacks) {
         this.callbackStore = callbacks;
@@ -55,6 +59,18 @@
      * the callback. If specified sends a fault message to the client about 
the timeout.
      */
     public void run() {
+        if (alreadyExecuting) return;
+
+        synchronized(lock) {
+            alreadyExecuting = true;
+            try {
+                processCallbacks();
+            } catch (Exception ignore) {}
+            alreadyExecuting = false;
+        }
+    }
+
+    private void processCallbacks() {
 
         // checks if callback store contains at least one entry before 
proceeding. otherwise getting
         // the time for doing nothing would be a inefficient task.

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=533211&r1=533210&r2=533211
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
 Fri Apr 27 13:32:35 2007
@@ -68,8 +68,9 @@
     ConfigurationContext cfgCtx = null;
 
     private Executor workerPool = null;
-    private static final int WORKERS_MAX_THREADS = 40;
-    private static final long WORKER_KEEP_ALIVE = 100L;
+    private static final int WORKERS_CORE_THREADS = 40;
+    private static final int WORKERS_MAX_THREADS  = 40;
+    private static final long WORKER_KEEP_ALIVE   = 5L;
 
     private static final String REQUEST_BUFFER = "request-buffer";
     private static final String RESPONSE_BUFFER = "response-buffer";
@@ -93,7 +94,7 @@
         this.connStrategy = new DefaultConnectionReuseStrategy();
 
         workerPool = new ThreadPoolExecutor(
-            1, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TimeUnit.SECONDS,
+            WORKERS_CORE_THREADS, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, 
TimeUnit.SECONDS,
             new LinkedBlockingQueue(),
             new DefaultThreadFactory(new ThreadGroup("Client Worker thread 
group"), "HttpClientWorker"));
     }

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java?view=diff&rev=533211&r1=533210&r2=533211
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
 Fri Apr 27 13:32:35 2007
@@ -18,10 +18,7 @@
  */
 package org.apache.axis2.transport.nhttp;
 
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.*;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.util.threadpool.DefaultThreadFactory;
 import org.apache.axis2.transport.nhttp.util.PipeImpl;
@@ -72,8 +69,9 @@
 
     /** the thread pool to process requests */
     private Executor workerPool = null;
-    private static final int WORKERS_MAX_THREADS = 40;
-    private static final long WORKER_KEEP_ALIVE = 100L;
+    private static final int WORKERS_CORE_THREADS = 40;
+    private static final int WORKERS_MAX_THREADS  = 40;
+    private static final long WORKER_KEEP_ALIVE   = 5L;
 
     private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
     private static final String RESPONSE_SOURCE_CHANNEL = 
"response-source-channel";
@@ -91,7 +89,7 @@
         this.connStrategy = new DefaultConnectionReuseStrategy();
 
         this.workerPool = new ThreadPoolExecutor(
-            1, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TimeUnit.SECONDS,
+            WORKERS_CORE_THREADS, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, 
TimeUnit.SECONDS,
             new LinkedBlockingQueue(),
             new DefaultThreadFactory(new ThreadGroup("Server Worker thread 
group"), "HttpServerWorker"));
     }
@@ -137,6 +135,9 @@
                     response, Channels.newOutputStream(responsePipe.sink())));
 
         } catch (IOException e) {
+            handleException("Error processing request received for : " +
+                request.getRequestLine().getUri(), e, conn);
+        } catch (RejectedExecutionException e) {
             handleException("Error processing request received for : " +
                 request.getRequestLine().getUri(), e, conn);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to