Repository: camel
Updated Branches:
  refs/heads/master 268b6947e -> 7d46782a7


CAMEL-10409 Double release of netty buffer with thanks to Vitalii


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d46782a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d46782a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d46782a

Branch: refs/heads/master
Commit: 7d46782a7bad42053dd5fb0d27281a85f4ec08c4
Parents: 268b6947
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Nov 15 10:04:41 2016 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue Nov 15 10:25:46 2016 +0800

----------------------------------------------------------------------
 .../netty4/http/NettyHttpProducer.java          | 13 ----
 .../component/netty4/http/BaseNettyTest.java    | 25 ++++++++
 .../netty4/http/LogCaptureAppender.java         | 65 ++++++++++++++++++++
 .../component/netty4/http/LogCaptureTest.java   | 35 +++++++++++
 .../src/test/resources/log4j2.properties        |  7 +++
 .../camel/component/netty4/NettyProducer.java   | 43 +++++++++++--
 6 files changed, 169 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
index 31710e0..1ee3616 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
@@ -86,19 +86,6 @@ public class NettyHttpProducer extends NettyProducer {
             }
         }
 
-        // need to release the request when we are done
-        exchange.addOnCompletion(new SynchronizationAdapter() {
-            @Override
-            public void onDone(Exchange exchange) {
-                if (request instanceof ReferenceCounted) {
-                    if (((ReferenceCounted) request).refCnt() > 0) {
-                        log.debug("Releasing Netty HttpRequest ByteBuf");
-                        ReferenceCountUtil.release(request);
-                    }
-                }
-            }
-        });
-
         return request;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java
index c511d89..75916e5 100644
--- 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java
@@ -18,14 +18,19 @@ package org.apache.camel.component.netty4.http;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.util.Collection;
 import java.util.Properties;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.logging.log4j.core.LogEvent;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -65,6 +70,26 @@ public class BaseNettyTest extends CamelTestSupport {
         }
     }
 
+    @BeforeClass
+    public static void startLeakDetection() {
+        System.setProperty("io.netty.leakDetection.maxRecords", "100");
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+    }
+
+    @AfterClass
+    public static void verifyNoLeaks() throws Exception {
+        //Force GC to bring up leaks
+        System.gc();
+        //Kick leak detection logging
+        ByteBufAllocator.DEFAULT.buffer(1).release();
+        Collection<LogEvent> events = LogCaptureAppender.getEvents();
+        if (!events.isEmpty()) {
+            String message = "Leaks detected while running tests: " + events;
+            LogCaptureAppender.reset();
+            throw new AssertionError(message);
+        }
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();

http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java
new file mode 100644
index 0000000..ef836b6
--- /dev/null
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+
+/**
+ */
+@Plugin(name = "LogCaptureAppender", category = "Core", elementType = 
"appender", printObject = true)
+public class LogCaptureAppender extends AbstractAppender {
+    private static final Deque<LogEvent> LOG_EVENTS = new ArrayDeque<>();
+
+    public LogCaptureAppender(String name, Filter filter, Layout<? extends 
Serializable> layout) {
+        super(name, filter, layout);
+    }
+
+    public LogCaptureAppender(String name, Filter filter, Layout<? extends 
Serializable> layout, boolean ignoreExceptions) {
+        super(name, filter, layout, ignoreExceptions);
+    }
+
+    @PluginFactory
+    public static LogCaptureAppender createAppender(@PluginAttribute("name") 
final String name,
+                                                    @PluginElement("Filter") 
final Filter filter) {
+        return new LogCaptureAppender(name, filter, null);
+    }
+
+    @Override
+    public void append(LogEvent logEvent) {
+        LOG_EVENTS.add(logEvent);
+    }
+
+    public static void reset() {
+        LOG_EVENTS.clear();
+    }
+
+    public static Collection<LogEvent> getEvents() {
+        return LOG_EVENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java
 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java
new file mode 100644
index 0000000..dfa38d2
--- /dev/null
+++ 
b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test ensures LogCaptureAppender is configured properly
+ */
+public class LogCaptureTest {
+    @Test
+    public void testCapture() {
+        
InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError");
+        Assert.assertFalse(LogCaptureAppender.getEvents().isEmpty());
+        LogCaptureAppender.reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4-http/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/resources/log4j2.properties 
b/components/camel-netty4-http/src/test/resources/log4j2.properties
index c5a234a..ce7b6d3 100644
--- a/components/camel-netty4-http/src/test/resources/log4j2.properties
+++ b/components/camel-netty4-http/src/test/resources/log4j2.properties
@@ -15,6 +15,7 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
+configuration.packages=org.apache.camel.component.netty4.http
 appender.file.type = File
 appender.file.name = file
 appender.file.fileName = target/camel-netty4-http-test.log
@@ -24,5 +25,11 @@ appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.capture.type=LogCaptureAppender
+appender.capture.name=capture
+
+logger.leak.name = io.netty.util.ResourceLeakDetector
+logger.leak.appenderRef.capture.ref = capture
+
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file

http://git-wip-us.apache.org/repos/asf/camel/blob/7d46782a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 273220c..a375681 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -38,6 +38,7 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -193,11 +194,15 @@ public class NettyProducer extends DefaultAsyncProducer {
                 callback.done(true);
                 return true;
             }
+            return processWithBody(exchange, body, new 
BodyReleaseCallback(callback, body));
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
             return true;
         }
+    }
+
+    private boolean processWithBody(final Exchange exchange, Object body, 
BodyReleaseCallback callback) {
 
         // set the exchange encoding property
         if (getConfiguration().getCharsetName() != null) {
@@ -240,7 +245,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         return false;
     }
 
-    public void processWithConnectedChannel(final Exchange exchange, final 
AsyncCallback callback, final ChannelFuture channelFuture, final Object body) {
+    public void processWithConnectedChannel(final Exchange exchange, final 
BodyReleaseCallback callback, final ChannelFuture channelFuture, final Object 
body) {
         // remember channel so we can reuse it
         final Channel channel = channelFuture.channel();
         if (getConfiguration().isReuseChannel() && 
exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
@@ -283,15 +288,16 @@ public class NettyProducer extends DefaultAsyncProducer {
                 channel.pipeline().replace(oldHandler, "timeout", newHandler);
             }
         }
-        
+
+        //This will refer to original callback since netty will release body 
by itself
         final AsyncCallback producerCallback;
 
         if (configuration.isReuseChannel()) {
             // use callback as-is because we should not put it back in the 
pool as NettyProducerCallback would do
             // as when reuse channel is enabled it will put the channel back 
in the pool when exchange is done using on completion
-            producerCallback = callback;
+            producerCallback = callback.getOriginalCallback();
         } else {
-            producerCallback = new NettyProducerCallback(channelFuture, 
callback);
+            producerCallback = new NettyProducerCallback(channelFuture, 
callback.getOriginalCallback());
         }
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
@@ -618,10 +624,10 @@ public class NettyProducer extends DefaultAsyncProducer {
      */
     private class ChannelConnectedListener implements ChannelFutureListener {
         private final Exchange exchange;
-        private final AsyncCallback callback;
+        private final BodyReleaseCallback callback;
         private final Object body;
 
-        ChannelConnectedListener(Exchange exchange, AsyncCallback callback, 
Object body) {
+        ChannelConnectedListener(Exchange exchange, BodyReleaseCallback 
callback, Object body) {
             this.exchange = exchange;
             this.callback = callback;
             this.body = body;
@@ -636,6 +642,7 @@ public class NettyProducer extends DefaultAsyncProducer {
                 }
                 exchange.setException(cause);
                 callback.done(false);
+                return;
             }
 
             try {
@@ -646,4 +653,28 @@ public class NettyProducer extends DefaultAsyncProducer {
             }
         }
     }
+
+    /**
+     * This class is used to release body in case when some error occured and 
body was not handed over
+     * to netty
+     */
+    private static final class BodyReleaseCallback implements AsyncCallback {
+        private volatile Object body;
+        private final AsyncCallback originalCallback;
+
+        private BodyReleaseCallback(AsyncCallback originalCallback, Object 
body) {
+            this.body = body;
+            this.originalCallback = originalCallback;
+        }
+
+        public AsyncCallback getOriginalCallback() {
+            return originalCallback;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            ReferenceCountUtil.release(body);
+            originalCallback.done(doneSync);
+        }
+    }
 }

Reply via email to