CAMEL-7909 camel-netty-http consumer need to close the connection if the 
response connection header is close

Conflicts:
        
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
        
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9cf11bd5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9cf11bd5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9cf11bd5

Branch: refs/remotes/origin/camel-2.13.x
Commit: 9cf11bd58146abb4938d0b546d3308d6e086b6da
Parents: f0e3e20
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Oct 14 10:14:12 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Fri Oct 17 11:35:34 2014 +0800

----------------------------------------------------------------------
 .../netty/http/DefaultNettyHttpBinding.java      |  9 +++++++--
 .../http/handlers/HttpServerChannelHandler.java  | 19 ++-----------------
 .../camel/component/netty/NettyHelper.java       |  2 +-
 .../camel/component/netty/NettyProducer.java     | 13 ++++++++++---
 .../handlers/ServerResponseFutureListener.java   |  5 +++++
 5 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 23cae6c..fa8fa81 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -34,6 +34,7 @@ import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -67,7 +68,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
     public DefaultNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
         this.headerFilterStrategy = headerFilterStrategy;
     }
-    
+
     public DefaultNettyHttpBinding copy() {
         try {
             return (DefaultNettyHttpBinding)this.clone();
@@ -384,7 +385,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             if (buffer.readerIndex() == buffer.writerIndex()) {
                 buffer.setIndex(0, buffer.writerIndex());
             }
-            // TODO How to enable the chunk transport 
+            // TODO How to enable the chunk transport
             int len = buffer.readableBytes();
             // set content-length
             response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, len);
@@ -411,6 +412,10 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             }
         }
         response.setHeader(HttpHeaders.Names.CONNECTION, connection);
+        // Just make sure we close the channel when the connection value is 
close
+        if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) {
+            
message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
+        }
         LOG.trace("Connection: {}", connection);
 
         return response;

http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 8894f6a..63edef7 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.netty.http.handlers;
 
-import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.nio.charset.Charset;
@@ -26,7 +25,6 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.http.HttpPrincipal;
@@ -37,7 +35,6 @@ import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ObjectHelper;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -49,8 +46,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.jboss.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
@@ -58,6 +53,7 @@ import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAV
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
 import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
+
 /**
  * Netty HTTP {@link ServerChannelHandler} that handles the incoming HTTP 
requests and routes
  * the received message in Camel.
@@ -268,7 +264,7 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
exceptionEvent) throws Exception {
-        
+
         // only close if we are still allowed to run
         if (consumer.isRunAllowed()) {
 
@@ -282,17 +278,6 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
         }
     }
 
-    @Override
-    protected ChannelFutureListener createResponseFutureListener(NettyConsumer 
consumer, Exchange exchange, SocketAddress remoteAddress) {
-        // make sure to close channel if not keep-alive
-        if (request != null && isKeepAlive(request)) {
-            LOG.trace("Request has Connection: keep-alive so Channel is not 
being closed");
-            return null;
-        } else {
-            LOG.trace("Request is not Connection: close so Channel is being 
closed");
-            return ChannelFutureListener.CLOSE;
-        }
-    }
 
     @Override
     protected Object getResponseBody(Exchange exchange) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 05f3e4d..b9368fa 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -111,7 +111,7 @@ public final class NettyHelper {
     public static void close(Channel channel) {
         if (channel != null) {
             LOG.trace("Closing channel: {}", channel);
-            channel.close();
+            channel.close().syncUninterruptibly();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 50de736..87ad2be 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -498,8 +498,11 @@ public class NettyProducer extends DefaultAsyncProducer {
         public void done(boolean doneSync) {
             // put back in pool
             try {
-                LOG.trace("Putting channel back to pool {}", channel);
-                pool.returnObject(channel);
+                // Only put the connected channel back to the pool
+                if (channel.isConnected()) {
+                    LOG.trace("Putting channel back to pool {}", channel);
+                    pool.returnObject(channel);
+                }
             } catch (Exception e) {
                 LOG.warn("Error returning channel to pool {}. This exception 
will be ignored.", channel);
             } finally {
@@ -525,7 +528,9 @@ public class NettyProducer extends DefaultAsyncProducer {
         @Override
         public void destroyObject(Channel channel) throws Exception {
             LOG.trace("Destroying channel: {}", channel);
-            NettyHelper.close(channel);
+            if (channel.isOpen()) {
+                NettyHelper.close(channel);
+            }
             allChannels.remove(channel);
         }
 
@@ -540,11 +545,13 @@ public class NettyProducer extends DefaultAsyncProducer {
         @Override
         public void activateObject(Channel channel) throws Exception {
             // noop
+            LOG.trace("activateObject channel: {} -> {}", channel);
         }
 
         @Override
         public void passivateObject(Channel channel) throws Exception {
             // noop
+            LOG.trace("passivateObject channel: {} -> {}", channel);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
index 619a62e..90dc9e6 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
@@ -61,6 +61,11 @@ public class ServerResponseFutureListener implements 
ChannelFutureListener {
         } else {
             close = 
exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
         }
+        
+        // check the setting on the exchange property
+        if (close == null) {
+            close = 
exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
+        }
 
         // should we disconnect, the header can override the configuration
         boolean disconnect = consumer.getConfiguration().isDisconnect();

Reply via email to