Author: hiranya
Date: Thu Aug 15 20:53:20 2013
New Revision: 1514487

URL: http://svn.apache.org/r1514487
Log:
Refactored the code related to closing/releasing http connections in the PT 
transport

Modified:
    
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java

Modified: 
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
 (original)
+++ 
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
 Thu Aug 15 20:53:20 2013
@@ -707,7 +707,7 @@
                                     and updated with new values. Default value 
is 50.
                                 </li>
                                 <li>
-                                    CacheDurationMins: Set the time duration 
(in minutes) between
+                                    CacheDurationMins: Sets the time duration 
(in minutes) between
                                     two consecutive runs of the CacheManager 
task which periodically
                                     performs housekeeping work in each cache. 
Default value is 15.
                                 </li>
@@ -751,7 +751,7 @@
                     <tr>
                         <td>http.socket.timeout</td>
                         <td>
-                            Set the TCP socket timeout in milliseconds
+                            Sets the TCP socket timeout in milliseconds
                             (See <a 
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_TIMEOUT";>SO_TIMEOUT</a>).
                             <div 
class="xmlConf">http.socket.timeout=20000</div>
                         </td>
@@ -761,7 +761,7 @@
                     <tr>
                         <td>http.connection.timeout</td>
                         <td>
-                            Set the TCP connection timeout in milliseconds. 
This determines the timeout
+                            Sets the TCP connection timeout in milliseconds. 
This determines the timeout
                             value for non-blocking connection requests. 
Setting this property to
                             0 disables connection timeout (i.e. no timeout).
                             <div 
class="xmlConf">http.connection.timeout=30000</div>
@@ -798,7 +798,7 @@
                     <tr>
                         <td>http.socket.buffer-size</td>
                         <td>
-                            Set the size of the I/O session buffers (in bytes) 
used by the transport
+                            Sets the size of the I/O session buffers (in 
bytes) used by the transport
                             for reading incoming data and writing outgoing 
data.
                             <div 
class="xmlConf">http.socket.buffer-size=4096</div>
                         </td>
@@ -808,7 +808,7 @@
                     <tr>
                         <td>http.socket.rcv-buffer-size</td>
                         <td>
-                            Set the size of the buffers (in bytes) used by the 
underlying platform
+                            Sets the size of the buffers (in bytes) used by 
the underlying platform
                             for incoming network I/O. This value is only a 
hint. When set, this is a
                             suggestion to the OS kernel from Synapse about the 
size of buffers to
                             use for the data to be received over the socket
@@ -821,7 +821,7 @@
                     <tr>
                         <td>http.socket.snd-buffer-size</td>
                         <td>
-                            Set the size of the buffers (in bytes) used by the 
underlying platform
+                            Sets the size of the buffers (in bytes) used by 
the underlying platform
                             for outgoing network I/O. This value is only a 
hint. When set, this is a
                             suggestion to the OS kernel from Synapse about the 
size of buffers to
                             use for the data to be sent over the socket
@@ -846,7 +846,7 @@
                     <tr>
                         <td>http.socket.reuseaddr</td>
                         <td>
-                            Set the <a 
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_REUSEADDR";>SO_REUSEADDR</a>
+                            Sets the <a 
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_REUSEADDR";>SO_REUSEADDR</a>
                             socket option for the sockets created by the HTTP 
transport. Accepted
                             values are either 'true' or 'false'.
                             <div 
class="xmlConf">http.socket.reuseaddr=true</div>
@@ -857,7 +857,7 @@
                     <tr>
                         <td>http.nio.select-interval</td>
                         <td>
-                            Set the time interval in milliseconds at which the 
I/O reactor wakes up
+                            Sets the time interval in milliseconds at which 
the I/O reactor wakes up
                             to check for timed out sessions and session 
requests.
                             <div 
class="xmlConf">http.nio.select-interval=2500</div>
                         </td>
@@ -867,7 +867,7 @@
                     <tr>
                         <td>io_threads_per_reactor <a 
name="io_threads_per_reactor"/></td>
                         <td>
-                            Set the number of I/O dispatcher threads to be 
used by each I/O reactor.
+                            Sets the number of I/O dispatcher threads to be 
used by each I/O reactor.
                             Typically, this property controls the ability of 
the HTTP transport
                             to handle concurrent I/O events.
                             It is recommended that this property is set to the 
number of CPU cores
@@ -920,7 +920,7 @@
                     <tr>
                         <td>io_buffer_size</td>
                         <td>
-                            Set the size of the I/O buffers (in bytes) used as 
the pipes between HTTP
+                            Sets the size of the I/O buffers (in bytes) used 
as the pipes between HTTP
                             listener and sender. Typically, the HTTP listener 
would write the incoming
                             message data to one of these buffers, and the 
sender would read from it to
                             send the message out.

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
 Thu Aug 15 20:53:20 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.transport.passthru.config.SourceConfiguration;
 import org.apache.synapse.transport.passthru.jmx.LatencyView;
 import 
org.apache.synapse.transport.passthru.jmx.PassThroughTransportMetricsCollector;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
 
 import java.io.IOException;
 
@@ -202,7 +203,8 @@ public class SourceHandler implements NH
                     if (!outBuf.hasData() && encoder.isCompleted()) {
                         // We are done - At this point the entire response 
payload has been
                         // written out to the SimpleOutputBuffer
-                        
sourceConfiguration.getSourceConnections().releaseConnection(conn);
+                        
PassThroughTransportUtils.finishUsingSourceConnection(conn.getHttpResponse(),
+                                conn, 
sourceConfiguration.getSourceConnections());
                     }
                 }
                 return;

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
 Thu Aug 15 20:53:20 2013
@@ -21,13 +21,13 @@ package org.apache.synapse.transport.pas
 
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.NHttpServerConnection;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpCoreContext;
 import org.apache.synapse.transport.passthru.config.SourceConfiguration;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -56,9 +56,6 @@ public class SourceResponse {
     /** Version of the response */
     private ProtocolVersion version = HttpVersion.HTTP_1_1;
 
-    /** Connection strategy */
-    private ConnectionReuseStrategy connStrategy = new 
DefaultConnectionReuseStrategy();
-
     private SourceRequest request = null;
 
     public SourceResponse(SourceConfiguration config, int status, 
SourceRequest request) {
@@ -157,23 +154,10 @@ public class SourceResponse {
         // Update connection state
         if (encoder.isCompleted()) {
             SourceContext.updateState(conn, ProtocolState.RESPONSE_DONE);
-
-            sourceConfiguration.getMetrics().
-                    
notifySentMessageSize(conn.getMetrics().getSentBytesCount());
-
-            if (!this.connStrategy.keepAlive(response, conn.getContext())) {
-                SourceContext.updateState(conn, ProtocolState.CLOSING);
-                
sourceConfiguration.getSourceConnections().closeConnection(conn);
-            } else if (SourceContext.get(conn).isShutDown()) {
-                // we need to shut down if the shutdown flag is set
-                SourceContext.updateState(conn, ProtocolState.CLOSING);
-                
sourceConfiguration.getSourceConnections().closeConnection(conn);
-            } else {
-                // Reset connection state
-                
sourceConfiguration.getSourceConnections().releaseConnection(conn);
-                // Ready to deal with a new request                
-                conn.requestInput();
-            }
+            sourceConfiguration.getMetrics().notifySentMessageSize(
+                    conn.getMetrics().getSentBytesCount());
+            PassThroughTransportUtils.finishUsingSourceConnection(response, 
conn,
+                    sourceConfiguration.getSourceConnections());
         }
         return bytes;
     }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
 Thu Aug 15 20:53:20 2013
@@ -21,10 +21,10 @@ package org.apache.synapse.transport.pas
 
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.synapse.transport.passthru.config.TargetConfiguration;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -56,10 +56,6 @@ public class TargetResponse {
     /** Protocol version */
     private ProtocolVersion version = HttpVersion.HTTP_1_1;
 
-    /** This utility class is used for determining weather we need to close 
the connection
-     * after submitting the response */
-    private ConnectionReuseStrategy connStrategy = new 
DefaultConnectionReuseStrategy();
-
     /** The connection */
     private NHttpClientConnection connection;
 
@@ -104,19 +100,9 @@ public class TargetResponse {
                 entity.setChunked(true);
             }
             response.setEntity(entity);
-        } else {            
-            if (!connStrategy.keepAlive(response, conn.getContext())) {
-                try {
-                    // this is a connection we should not re-use
-                    TargetContext.updateState(conn, ProtocolState.CLOSING);
-                    
targetConfiguration.getConnections().shutdownConnection(conn);
-                                       
-                } catch (Exception ignore) {
-
-                }
-            } else {
-                targetConfiguration.getConnections().releaseConnection(conn);
-            }
+        } else {
+            PassThroughTransportUtils.finishUsingTargetConnection(response, 
conn,
+                    targetConfiguration.getConnections());
         }
     }
 
@@ -140,13 +126,8 @@ public class TargetResponse {
             TargetContext.updateState(conn, ProtocolState.RESPONSE_DONE);
             targetConfiguration.getMetrics().notifyReceivedMessageSize(
                     conn.getMetrics().getReceivedBytesCount());
-
-            if (!this.connStrategy.keepAlive(response, conn.getContext())) {
-                TargetContext.updateState(conn, ProtocolState.CLOSED);
-                targetConfiguration.getConnections().shutdownConnection(conn);
-            } else {
-                targetConfiguration.getConnections().releaseConnection(conn);
-            }
+            PassThroughTransportUtils.finishUsingTargetConnection(response, 
conn,
+                    targetConfiguration.getConnections());
         }
         return bytes;
     }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
 Thu Aug 15 20:53:20 2013
@@ -104,7 +104,31 @@ public class TargetConnections {
     }
 
     /**
-     * This connection is no longer valid. So we need to shutdownConnection 
connection.
+     * This connection is no longer needed. So we need to close connection.
+     *
+     * @param conn connection to shutdownConnection
+     */
+    public void closeConnection(NHttpClientConnection conn) {
+        HostConnections pool = (HostConnections) 
conn.getContext().getAttribute(
+                PassThroughConstants.CONNECTION_POOL);
+
+        TargetContext.get(conn).reset();
+
+        if (pool != null) {
+            pool.forget(conn);
+        } else {
+            // we shouldn't get here
+            log.fatal("Connection without a pool. Something wrong. Need to 
fix.");
+        }
+
+        try {
+            conn.close();
+        } catch (IOException ignored) {
+        }
+    }
+
+    /**
+     * This connection is no longer valid. So we need to shutdown connection.
      *
      * @param conn connection to shutdownConnection
      */

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
 Thu Aug 15 20:53:20 2013
@@ -24,11 +24,21 @@ import org.apache.axis2.addressing.Endpo
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.Constants;
 import org.apache.axis2.transport.TransportUtils;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.NHttpServerConnection;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.HttpStatus;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.transport.passthru.PassThroughConstants;
+import org.apache.synapse.transport.passthru.ProtocolState;
+import org.apache.synapse.transport.passthru.SourceContext;
+import org.apache.synapse.transport.passthru.TargetContext;
+import org.apache.synapse.transport.passthru.connections.SourceConnections;
+import org.apache.synapse.transport.passthru.connections.TargetConnections;
 
 import java.net.InetAddress;
 import java.util.Map;
@@ -41,6 +51,8 @@ public class PassThroughTransportUtils {
 
     private static final Log log = 
LogFactory.getLog(PassThroughTransportUtils.class);
 
+    private static final ConnectionReuseStrategy connStrategy = new 
DefaultConnectionReuseStrategy();
+
     /**
      * This method tries to determine the hostname of the given InetAddress 
without
      * triggering a reverse DNS lookup.  {@link 
java.net.InetAddress#getHostName()}
@@ -212,4 +224,31 @@ public class PassThroughTransportUtils {
                 PassThroughConstants.MESSAGE_BUILDER_INVOKED));
     }
 
+    public static void finishUsingSourceConnection(HttpResponse response,
+                                                   NHttpServerConnection conn,
+                                                   SourceConnections 
connections) {
+        if (!connStrategy.keepAlive(response, conn.getContext()) ||
+                SourceContext.get(conn).isShutDown()) {
+            SourceContext.updateState(conn, ProtocolState.CLOSING);
+            connections.closeConnection(conn);
+        } else {
+            // Reset connection state
+            connections.releaseConnection(conn);
+            // Ready to deal with a new request
+            conn.requestInput();
+        }
+    }
+
+    public static void finishUsingTargetConnection(HttpResponse response,
+                                                   NHttpClientConnection conn,
+                                                   TargetConnections 
connections) {
+        if (!connStrategy.keepAlive(response, conn.getContext())) {
+            // this is a connection we should not re-use
+            TargetContext.updateState(conn, ProtocolState.CLOSING);
+            connections.closeConnection(conn);
+        } else {
+            connections.releaseConnection(conn);
+        }
+    }
+
 }


Reply via email to