Repository: flume Updated Branches: refs/heads/flume-1.7 0f304bab5 -> 200e6dcbf
FLUME-2732. Make maximum tolerated failures before shutting down and recreating client in AsyncHbaseSink configurable (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/200e6dcb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/200e6dcb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/200e6dcb Branch: refs/heads/flume-1.7 Commit: 200e6dcbf92e196a6f647a909a9a5cdae1fa0c0f Parents: 0f304ba Author: Hari Shreedharan <[email protected]> Authored: Tue Jul 7 21:44:01 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Jul 7 21:45:02 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++++--- .../sink/hbase/HBaseSinkConfigurationConstants.java | 4 ++++ 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/200e6dcb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 1d05189..80a3484 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -126,6 +126,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private boolean enableWal = true; private boolean batchIncrements = false; private volatile int totalCallbacksReceived = 0; + private int maxConsecutiveFails; private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer; // The HBaseClient buffers the requests until a callback is received. In the event of a // timeout, there is no way to clear these buffers. If there is a major cluster issue, this @@ -140,8 +141,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private final Comparator<byte[]> COMPARATOR = UnsignedBytes .lexicographicalComparator(); - private static final int MAX_CONSECUTIVE_FAILS = 10; - public AsyncHBaseSink(){ this(null); } @@ -417,6 +416,10 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { logger.info("Increment coalescing is enabled. Increments will be " + "buffered."); } + + maxConsecutiveFails = context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS, + HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS); + } @VisibleForTesting @@ -442,6 +445,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } private HBaseClient initHBaseClient() { + logger.info("Initializing HBase Client"); if (!isTimeoutTest) { client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); } else { @@ -526,6 +530,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } private void shutdownHBaseClient() { + logger.info("Shutting down HBase Client"); final CountDownLatch waiter = new CountDownLatch(1); try { client.shutdown().addCallback(new Callback<Object, Object>() { @@ -556,7 +561,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private void handleTransactionFailure(Transaction txn) throws EventDeliveryException { - if (consecutiveHBaseFailures >= MAX_CONSECUTIVE_FAILS) { + if (maxConsecutiveFails > 0 && consecutiveHBaseFailures >= maxConsecutiveFails) { if (client != null) { shutdownHBaseClient(); } http://git-wip-us.apache.org/repos/asf/flume/blob/200e6dcb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index 1a78071..5560624 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -70,4 +70,8 @@ public class HBaseSinkConfigurationConstants { public static final Boolean DEFAULT_COALESCE_INCREMENTS = false; + public static final int DEFAULT_MAX_CONSECUTIVE_FAILS = 10; + + public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails"; + }
