Author: ruwan
Date: Mon May  3 21:58:40 2010
New Revision: 940649

URL: http://svn.apache.org/viewvc?rev=940649&view=rev
Log:
Fixing the graceful shutdown to properly work on keep alive http connecitons

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
 Mon May  3 21:58:40 2010
@@ -455,6 +455,11 @@ public class Axis2SynapseController impl
                 log.info(new StringBuilder("Waiting for: 
").append(pendingSenderThreads)
                         .append(" listener threads to complete").toString());
             }
+            int activeConnections = 
transportHelper.getActiveConnectionsCount();
+            if (activeConnections > 0) {
+                log.info("Waiting for: " + activeConnections
+                        + " active connections to be closed..");
+            }
             int pendingTransportThreads = pendingListenerThreads + 
pendingSenderThreads;
             
             int pendingCallbacks = 
ServerManager.getInstance().getCallbackCount();
@@ -480,8 +485,9 @@ public class Axis2SynapseController impl
                 if (System.currentTimeMillis() < endTime) {
                     log.info(new StringBuilder("Waiting for a maximum of 
another ")
                             .append((endTime - System.currentTimeMillis()) / 
1000)
-                            .append(" seconds until transport threads and 
tasks become idle,")
-                            .append(" and callbacks complete..").toString());
+                            .append(" seconds until transport threads and 
tasks become idle, ")
+                            .append("active connections to get closed,")
+                            .append(" and callbacks to be 
completed..").toString());
                     try {
                         Thread.sleep(waitIntervalMillis);
                     } catch (InterruptedException ignore) {

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
 Mon May  3 21:58:40 2010
@@ -27,6 +27,7 @@ import org.apache.axis2.transport.Transp
 import org.apache.axis2.transport.base.ManagementSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.nhttp.HttpCoreNIOListener;
 
 import java.util.Map;
 
@@ -178,6 +179,19 @@ public class Axis2TransportHelper {
         return pendingThreads;
     }
 
+    public int getActiveConnectionsCount() {
+        Map<String, TransportInDescription> trpIns
+                = 
configurationContext.getAxisConfiguration().getTransportsIn();
+
+        for (TransportInDescription trpIn : trpIns.values()) {
+            if (trpIn.getReceiver() instanceof HttpCoreNIOListener) {
+                return ((HttpCoreNIOListener) 
trpIn.getReceiver()).getActiveConnectionsSize();
+            }
+        }
+
+        return 0;
+    }
+
     /**
      * Determines the total number of pending sender threads (active + queued).
      * 

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 Mon May  3 21:58:40 2010
@@ -107,7 +107,7 @@ public class HttpCoreNIOListener impleme
     /** Metrics collector for this transport */
     private MetricsCollector metrics = new MetricsCollector();
     /** state of the listener */
-    private int state = BaseConstants.STOPPED;
+    private volatile int state = BaseConstants.STOPPED;
     /** The ServerHandler */
     private ServerHandler handler = null;
     /** This will execute the requests based on calculate priority */
@@ -206,6 +206,10 @@ public class HttpCoreNIOListener impleme
         }
     }
 
+    public int getActiveConnectionsSize() {
+        return handler.getActiveConnectionsSize();
+    }
+
     private void createPriorityConfiguration(String fileName) throws AxisFault 
{
         OMElement definitions = null;
         try {
@@ -437,6 +441,7 @@ public class HttpCoreNIOListener impleme
         if (state != BaseConstants.STARTED) return;
         try {
             ioReactor.pause();
+            handler.markActiveConnectionsToBeClosed();
             state = BaseConstants.PAUSED;
             log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener 
Paused");
         } catch (IOException e) {

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 Mon May  3 21:58:40 2010
@@ -36,6 +36,7 @@ import org.apache.axis2.transport.base.B
 import org.apache.axis2.transport.base.ManagementSupport;
 import org.apache.axis2.transport.base.MetricsCollector;
 import org.apache.axis2.transport.base.TransportMBeanSupport;
+import org.apache.axis2.util.JavaUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.http.*;
@@ -93,7 +94,7 @@ public class HttpCoreNIOSender extends A
     /** Metrics collector for the sender */
     private MetricsCollector metrics = new MetricsCollector();
     /** state of the listener */
-    private int state = BaseConstants.STOPPED;
+    private volatile int state = BaseConstants.STOPPED;
     /** The proxy host */
     private String proxyHost = null;
     /** The proxy port */
@@ -477,7 +478,7 @@ public class HttpCoreNIOSender extends A
             }
         }
 
-        if (state == BaseConstants.PAUSED) {
+        if 
(JavaUtils.isTrueExplicitly(worker.getConn().getContext().getAttribute("forceClosing")))
 {
             HttpRequest req = (HttpRequest)
                     worker.getConn().getContext().getAttribute("http.request");
             req.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 Mon May  3 21:58:40 2010
@@ -65,6 +65,11 @@ public class NhttpConstants {
     /** The message context property name which holds the exception (if any) 
for the last encountered exception */
     public static final String ERROR_EXCEPTION = "ERROR_EXCEPTION";
 
+    /** Denotes a connection close is forced if set at the NhttpContext */
+    public static final String FORCE_CLOSING = "forceClosing";
+    /** Denotes a message is being processed by the current connection if this 
is set at the context */
+    public static final String MESSAGE_IN_FLIGHT = "message-in-flight";
+
     // ********** DO NOT CHANGE THESE UNLESS CORRESPONDING SYNAPSE CONSTANT 
ARE CHANGED ************
     public static final int RCV_IO_ERROR_SENDING   = 101000;
     public static final int RCV_IO_ERROR_RECEIVING = 101001;

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 Mon May  3 21:58:40 2010
@@ -22,6 +22,7 @@ import org.apache.axis2.context.Configur
 import org.apache.axis2.transport.base.MetricsCollector;
 import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
 import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.util.JavaUtils;
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.entity.ByteArrayEntity;
@@ -52,6 +53,8 @@ import org.apache.synapse.transport.nhtt
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -86,6 +89,9 @@ public class ServerHandler implements NH
     private WorkerPool workerPool = null;
     /** the metrics collector */
     private MetricsCollector metrics = null;
+    
+    /** keeps track of the connection that are alive in the system */
+    private volatile List<NHttpServerConnection> activeConnections = null;
 
     private Parser parser = null;
 
@@ -108,6 +114,7 @@ public class ServerHandler implements NH
         this.httpProcessor = getHttpProcessor();
         this.connStrategy = new DefaultConnectionReuseStrategy();
         this.allocator = new HeapByteBufferAllocator();
+        this.activeConnections = new ArrayList<NHttpServerConnection>();
 
         this.cfg = NHttpConfiguration.getInstance();
        if (executor == null)  {
@@ -132,6 +139,7 @@ public class ServerHandler implements NH
         HttpContext context = conn.getContext();
         HttpRequest request = conn.getHttpRequest();
         context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+        context.setAttribute(NhttpConstants.MESSAGE_IN_FLIGHT, "true");
 
         // prepare to collect debug information
         conn.getContext().setAttribute(
@@ -357,13 +365,33 @@ public class ServerHandler implements NH
         }
         // record connection creation time for debug logging
         conn.getContext().setAttribute(CONNECTION_CREATION_TIME, 
System.currentTimeMillis());
+        if (log.isDebugEnabled()) {
+            log.debug("Adding a connection : "
+                + conn + " to the pool, existing pool size : " + 
activeConnections.size());
+        }
+        activeConnections.add(conn);
     }
 
     public void responseReady(NHttpServerConnection conn) {
 
+        if 
(JavaUtils.isTrueExplicitly(conn.getContext().getAttribute(NhttpConstants.FORCE_CLOSING))
+                && !JavaUtils.isTrueExplicitly(conn.getContext().getAttribute(
+                NhttpConstants.MESSAGE_IN_FLIGHT))) {
+
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Closing a persisted connection since it is 
forced : " + conn);
+                }
+                conn.close();
+            } catch (IOException ignore) {}
+            
+            return;
+        }
+
         
metrics.notifyReceivedMessageSize(conn.getMetrics().getReceivedBytesCount());
         metrics.notifySentMessageSize(conn.getMetrics().getSentBytesCount());
         conn.getMetrics().reset();
+        conn.getContext().removeAttribute(NhttpConstants.MESSAGE_IN_FLIGHT);
 
         if (log.isTraceEnabled()) {
             log.trace("Ready to send response");
@@ -384,6 +412,17 @@ public class ServerHandler implements NH
         }
     }
 
+    public void markActiveConnectionsToBeClosed() {
+        log.info("Marking the closing signal on the connection pool of size : "
+                + activeConnections.size());
+        synchronized (this) {
+            for (NHttpServerConnection conn : activeConnections) {
+                conn.getContext().setAttribute(NhttpConstants.FORCE_CLOSING, 
"true");
+                conn.requestOutput();
+            }
+        }
+    }
+
     /**
      * Handle HTTP Protocol violations with an error response
      * @param conn the connection being processed
@@ -469,11 +508,23 @@ public class ServerHandler implements NH
         if (inputBuffer != null) {
             inputBuffer.close();
         }
+
+        synchronized (this) {
+            if (activeConnections.remove(conn) && log.isDebugEnabled()) {
+                log.debug("Removing the connection : " + conn
+                        + " from pool of size : " + activeConnections.size());
+            }
+        }
+
         try {
             conn.shutdown();
         } catch (IOException ignore) {}
     }
 
+    public int getActiveConnectionsSize() {
+        return activeConnections.size();
+    }
+
     /**
      * Return the HttpProcessor for responses
      * @return the HttpProcessor that processes HttpResponses of this server


Reply via email to