Repository: flume
Updated Branches:
  refs/heads/flume-1.7 0f304bab5 -> 200e6dcbf


FLUME-2732. Make maximum tolerated failures before shutting down and recreating 
client in AsyncHbaseSink configurable

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/200e6dcb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/200e6dcb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/200e6dcb

Branch: refs/heads/flume-1.7
Commit: 200e6dcbf92e196a6f647a909a9a5cdae1fa0c0f
Parents: 0f304ba
Author: Hari Shreedharan <[email protected]>
Authored: Tue Jul 7 21:44:01 2015 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Jul 7 21:45:02 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++++---
 .../sink/hbase/HBaseSinkConfigurationConstants.java      |  4 ++++
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/200e6dcb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 1d05189..80a3484 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -126,6 +126,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   private boolean enableWal = true;
   private boolean batchIncrements = false;
   private volatile int totalCallbacksReceived = 0;
+  private int maxConsecutiveFails;
   private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
   // The HBaseClient buffers the requests until a callback is received. In the 
event of a
   // timeout, there is no way to clear these buffers. If there is a major 
cluster issue, this
@@ -140,8 +141,6 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   private final Comparator<byte[]> COMPARATOR = UnsignedBytes
     .lexicographicalComparator();
 
-  private static final int MAX_CONSECUTIVE_FAILS = 10;
-
   public AsyncHBaseSink(){
     this(null);
   }
@@ -417,6 +416,10 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
       logger.info("Increment coalescing is enabled. Increments will be " +
         "buffered.");
     }
+
+    maxConsecutiveFails = 
context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS,
+            HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS);
+
   }
 
   @VisibleForTesting
@@ -442,6 +445,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   }
 
   private HBaseClient initHBaseClient() {
+    logger.info("Initializing HBase Client");
     if (!isTimeoutTest) {
       client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
     } else {
@@ -526,6 +530,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   }
 
   private void shutdownHBaseClient() {
+    logger.info("Shutting down HBase Client");
     final CountDownLatch waiter = new CountDownLatch(1);
     try {
       client.shutdown().addCallback(new Callback<Object, Object>() {
@@ -556,7 +561,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
 
   private void handleTransactionFailure(Transaction txn)
       throws EventDeliveryException {
-    if (consecutiveHBaseFailures >= MAX_CONSECUTIVE_FAILS) {
+    if (maxConsecutiveFails > 0 && consecutiveHBaseFailures >= 
maxConsecutiveFails) {
       if (client != null) {
         shutdownHBaseClient();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/200e6dcb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 1a78071..5560624 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -70,4 +70,8 @@ public class HBaseSinkConfigurationConstants {
 
   public static final Boolean DEFAULT_COALESCE_INCREMENTS = false;
 
+  public static final int DEFAULT_MAX_CONSECUTIVE_FAILS = 10;
+
+  public static final String CONFIG_MAX_CONSECUTIVE_FAILS = 
"maxConsecutiveFails";
+
 }

Reply via email to