Updated Branches:
  refs/heads/trunk 93f03ab44 -> 2e1b7c23c

FLUME-2002. Flume RPC Client creates 2 threads per each log attempt if the 
remote flume agent goes down.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2e1b7c23
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2e1b7c23
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2e1b7c23

Branch: refs/heads/trunk
Commit: 2e1b7c23c4964b8860b876cc5c8c3642c4d74ab9
Parents: 93f03ab
Author: Hari Shreedharan <[email protected]>
Authored: Wed May 1 09:27:01 2013 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Wed May 1 09:27:01 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/api/NettyAvroRpcClient.java   |   22 ++++++-
 .../apache/flume/api/TestNettyAvroRpcClient.java   |   47 +++++++++++++++
 2 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2e1b7c23/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 8285129..99bd5ae 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -110,9 +110,9 @@ implements RpcClient {
   private void connect(long timeout, TimeUnit tu) throws FlumeException {
     callTimeoutPool = Executors.newCachedThreadPool(
         new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
-    try {
+    NioClientSocketChannelFactory socketChannelFactory = null;
 
-      NioClientSocketChannelFactory socketChannelFactory;
+    try {
 
       if (enableDeflateCompression) {
         socketChannelFactory = new CompressionChannelFactory(
@@ -134,8 +134,22 @@ implements RpcClient {
       avroClient =
           SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
           transceiver);
-    } catch (IOException ex) {
-      throw new FlumeException(this + ": RPC connection error", ex);
+    } catch (Throwable t) {
+      if (callTimeoutPool != null) {
+        callTimeoutPool.shutdownNow();
+      }
+      if (socketChannelFactory != null) {
+        socketChannelFactory.releaseExternalResources();
+      }
+      if (t instanceof IOException) {
+        throw new FlumeException(this + ": RPC connection error", t);
+      } else if (t instanceof FlumeException) {
+        throw (FlumeException) t;
+      } else if (t instanceof Error) {
+        throw (Error) t;
+      } else {
+        throw new FlumeException(this + ": Unexpected exception", t);
+      }
     }
 
     setState(ConnState.READY);

http://git-wip-us.apache.org/repos/asf/flume/blob/2e1b7c23/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
index 1e6d2b2..72e331b 100644
--- 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -18,7 +18,10 @@
  */
 package org.apache.flume.api;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -331,4 +334,48 @@ public class TestNettyAvroRpcClient {
     logger.error("Throwing: I should never have gotten here!");
   }
 
+  @Test
+  public void spinThreadsCrazily() throws IOException {
+
+    int initThreadCount = ManagementFactory.getThreadMXBean().getThreadCount();
+
+    // find a port we know is closed by opening a free one then closing it
+    ServerSocket sock = new ServerSocket(0);
+    int port = sock.getLocalPort();
+    sock.close();
+
+    Properties props = new Properties();
+    props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+        RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE);
+    props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    props.put(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+        "localhost:" + port);
+    props.put(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, "20");
+    props.put(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, "20");
+    props.put(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "1");
+
+    for (int i = 0; i < 1000; i++) {
+      RpcClient client = null;
+      try {
+        client = RpcClientFactory.getDefaultInstance("localhost", port);
+        client.append(EventBuilder.withBody("Hello", 
Charset.forName("UTF-8")));
+      } catch (FlumeException e) {
+        logger.warn("Unexpected error", e);
+      } catch (EventDeliveryException e) {
+        logger.warn("Expected error", e);
+      } finally {
+        if (client != null) {
+          client.close();
+        }
+      }
+    }
+
+    int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
+    logger.warn("Init thread count: {}, thread count: {}",
+        initThreadCount, threadCount);
+    Assert.assertEquals("Thread leak in RPC client",
+        initThreadCount, threadCount);
+
+  }
+
 }

Reply via email to