Repository: flume Updated Branches: refs/heads/trunk 48126c235 -> 38f0b316d
FLUME-2738. Fix file descriptor leak in AsyncHBaseSink when HBase cluster goes down. (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/38f0b316 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/38f0b316 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/38f0b316 Branch: refs/heads/trunk Commit: 38f0b316ddcc834c6249c7179c4c1d23ab445a84 Parents: 48126c2 Author: Hari Shreedharan <[email protected]> Authored: Thu Jul 9 12:21:17 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Jul 9 12:21:17 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 18 +++--- .../flume/sink/hbase/TestAsyncHBaseSink.java | 66 ++++++++++++++++++++ 2 files changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/38f0b316/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 80a3484..eac00f6 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -437,23 +437,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); - sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat(this.getName() + " HBase Call Pool").build()); - logger.info("Callback pool created"); client = initHBaseClient(); super.start(); } private HBaseClient initHBaseClient() { logger.info("Initializing HBase Client"); - if (!isTimeoutTest) { - client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); - } else { - client = new HBaseClient(zkQuorum, zkBaseDir, - new NioClientSocketChannelFactory(Executors - .newSingleThreadExecutor(), - Executors.newSingleThreadExecutor())); - } + + sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat(this.getName() + " HBase Call Pool").build()); + logger.info("Callback pool created"); + client = new HBaseClient(zkQuorum, zkBaseDir, + new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool)); + final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean fail = new AtomicBoolean(false); client.ensureTableFamilyExists( http://git-wip-us.apache.org/repos/asf/flume/blob/38f0b316/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index af90f99..b4bbd6b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -20,6 +20,8 @@ package org.apache.flume.sink.hbase; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -49,6 +51,7 @@ import org.junit.Ignore; import org.junit.Test; import com.google.common.primitives.Longs; +import com.sun.management.UnixOperatingSystemMXBean; import org.junit.After; @@ -62,6 +65,7 @@ public class TestAsyncHBaseSink { private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; private boolean deleteTable = true; + private static OperatingSystemMXBean os; @BeforeClass @@ -78,6 +82,8 @@ public class TestAsyncHBaseSink { ctxMap.put("keep-alive", "0"); ctxMap.put("timeout", "10000"); ctx.putAll(ctxMap); + + os = ManagementFactory.getOperatingSystemMXBean(); } @AfterClass @@ -448,6 +454,66 @@ public class TestAsyncHBaseSink { sink.process(); sink.stop(); } + + // We only have support for getting File Descriptor count for Unix from the JDK + private long getOpenFileDescriptorCount() { + if(os instanceof UnixOperatingSystemMXBean){ + return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount(); + } else { + return -1; + } + } + + /* + * Before the fix for FLUME-2738, consistently File Descriptors were leaked with at least + * > 10 FDs being leaked for every single shutdown-reinitialize routine + * If there is a leak, then the increase in FDs should be way higher than + * 50 and if there is no leak, there should not be any substantial increase in + * FDs. This is over a set of 10 shutdown-reinitialize runs + * This test makes sure that there is no File Descriptor leak, by continuously + * failing transactions and shutting down and reinitializing the client every time + * and this test will fail if a leak is detected + */ + @Test + public void testFDLeakOnShutdown() throws Exception { + if(getOpenFileDescriptorCount() < 0) { + return; + } + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), + true, false); + ctx.put("maxConsecutiveFails", "1"); + Configurables.configure(sink, ctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, ctx); + sink.setChannel(channel); + channel.start(); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for(int i = 0; i < 3; i++){ + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); + channel.put(e); + } + tx.commit(); + tx.close(); + Assert.assertFalse(sink.isConfNull()); + long initialFDCount = getOpenFileDescriptorCount(); + + // Since the isTimeOutTest is set to true, transaction will fail + // with EventDeliveryException + for(int i = 0; i < 10; i ++) { + try { + sink.process(); + } catch (EventDeliveryException ex) { + } + } + long increaseInFD = getOpenFileDescriptorCount() - initialFDCount; + Assert.assertTrue("File Descriptor leak detected. FDs have increased by " + + increaseInFD + " from an initial FD count of " + initialFDCount, increaseInFD < 50); + } + /** * This test must run last - it shuts down the minicluster :D * @throws Exception
