Updated Branches: refs/heads/flume-1.4 0cad6d3e5 -> 543a5a196
FLUME-2002. Flume RPC Client creates 2 threads per each log attempt if the remote flume agent goes down. (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/543a5a19 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/543a5a19 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/543a5a19 Branch: refs/heads/flume-1.4 Commit: 543a5a196b0a31e7bebfabdd511979367d5ab8c8 Parents: 0cad6d3 Author: Hari Shreedharan <[email protected]> Authored: Wed May 1 09:27:01 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed May 1 09:27:43 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/api/NettyAvroRpcClient.java | 22 ++++++- .../apache/flume/api/TestNettyAvroRpcClient.java | 47 +++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/543a5a19/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index 8285129..99bd5ae 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -110,9 +110,9 @@ implements RpcClient { private void connect(long timeout, TimeUnit tu) throws FlumeException { callTimeoutPool = Executors.newCachedThreadPool( new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker")); - try { + NioClientSocketChannelFactory socketChannelFactory = null; - NioClientSocketChannelFactory socketChannelFactory; + try { if (enableDeflateCompression) { socketChannelFactory = new CompressionChannelFactory( @@ -134,8 +134,22 @@ implements RpcClient { avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver); - } catch (IOException ex) { - throw new FlumeException(this + ": RPC connection error", ex); + } catch (Throwable t) { + if (callTimeoutPool != null) { + callTimeoutPool.shutdownNow(); + } + if (socketChannelFactory != null) { + socketChannelFactory.releaseExternalResources(); + } + if (t instanceof IOException) { + throw new FlumeException(this + ": RPC connection error", t); + } else if (t instanceof FlumeException) { + throw (FlumeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else { + throw new FlumeException(this + ": Unexpected exception", t); + } } setState(ConnState.READY); http://git-wip-us.apache.org/repos/asf/flume/blob/543a5a19/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java index 1e6d2b2..72e331b 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java @@ -18,7 +18,10 @@ */ package org.apache.flume.api; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -331,4 +334,48 @@ public class TestNettyAvroRpcClient { logger.error("Throwing: I should never have gotten here!"); } + @Test + public void spinThreadsCrazily() throws IOException { + + int initThreadCount = ManagementFactory.getThreadMXBean().getThreadCount(); + + // find a port we know is closed by opening a free one then closing it + ServerSocket sock = new ServerSocket(0); + int port = sock.getLocalPort(); + sock.close(); + + Properties props = new Properties(); + props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientConfigurationConstants.DEFAULT_CLIENT_TYPE); + props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); + props.put(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1", + "localhost:" + port); + props.put(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, "20"); + props.put(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, "20"); + props.put(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "1"); + + for (int i = 0; i < 1000; i++) { + RpcClient client = null; + try { + client = RpcClientFactory.getDefaultInstance("localhost", port); + client.append(EventBuilder.withBody("Hello", Charset.forName("UTF-8"))); + } catch (FlumeException e) { + logger.warn("Unexpected error", e); + } catch (EventDeliveryException e) { + logger.warn("Expected error", e); + } finally { + if (client != null) { + client.close(); + } + } + } + + int threadCount = ManagementFactory.getThreadMXBean().getThreadCount(); + logger.warn("Init thread count: {}, thread count: {}", + initThreadCount, threadCount); + Assert.assertEquals("Thread leak in RPC client", + initThreadCount, threadCount); + + } + }
