Updated Branches:
  refs/heads/flume-1.4 b64c8d040 -> 36c4d2fd4

FLUME-1906 Ability to disable WAL for put operation in HBaseSink

(Hari Shreedharan via Mubarak Seyed)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/36c4d2fd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/36c4d2fd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/36c4d2fd

Branch: refs/heads/flume-1.4
Commit: 36c4d2fd41ad43010090ca342c02768da8f83a16
Parents: b64c8d0
Author: Mubarak Seyed <[email protected]>
Authored: Thu Feb 14 23:48:41 2013 -0800
Committer: Mubarak Seyed <[email protected]>
Committed: Thu Feb 14 23:53:11 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/AsyncHBaseSink.java    |   11 ++++++++
 .../org/apache/flume/sink/hbase/HBaseSink.java     |   21 +++++++++++++++
 .../hbase/HBaseSinkConfigurationConstants.java     |    4 +++
 3 files changed, 36 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/36c4d2fd/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 0b6f885..7020fcd 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -113,6 +113,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   private String zkBaseDir;
   private ExecutorService sinkCallbackPool;
   private boolean isTest;
+  private boolean enableWal = true;
 
   public AsyncHBaseSink(){
     this(null);
@@ -186,6 +187,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
           callbacksExpected.addAndGet(actions.size() + increments.size());
 
           for (PutRequest action : actions) {
+            action.setDurable(enableWal);
             client.put(action).addCallbacks(putSuccessCallback, 
putFailureCallback);
           }
           for (AtomicIncrementRequest increment : increments) {
@@ -322,6 +324,15 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     }
     Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
         "The Zookeeper quorum cannot be null and should be specified.");
+
+    enableWal = context.getBoolean(HBaseSinkConfigurationConstants
+      .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+    logger.info("The write to WAL option is set to: " + 
String.valueOf(enableWal));
+    if(!enableWal) {
+      logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " 
+
+        "All writes to HBase will have WAL disabled, and any data in the " +
+        "memstore of this region in the Region Server could be lost!");
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flume/blob/36c4d2fd/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 835a69e..31fb7ff 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
@@ -95,6 +96,7 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
   private String kerberosPrincipal;
   private String kerberosKeytab;
   private User hbaseUser;
+  private boolean enableWal = true;
 
   public HBaseSink(){
     this(HBaseConfiguration.create());
@@ -197,6 +199,15 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
     }
     kerberosKeytab = 
context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, "");
     kerberosPrincipal = 
context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, "");
+
+    enableWal = context.getBoolean(HBaseSinkConfigurationConstants
+      .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+    logger.info("The write to WAL option is set to: " + 
String.valueOf(enableWal));
+    if(!enableWal) {
+      logger.warn("HBase Sink's enableWal configuration is set to false. All " 
+
+        "writes to HBase will have WAL disabled, and any data in the " +
+        "memstore of this region in the Region Server could be lost!");
+    }
   }
 
   @Override
@@ -229,6 +240,15 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
       runPrivileged(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
+          for(Row r : actions) {
+            if(r instanceof Put) {
+              ((Put)r).setWriteToWAL(enableWal);
+            }
+            // Newer versions of HBase - Increment implements Row.
+            if(r instanceof Increment) {
+              ((Increment)r).setWriteToWAL(enableWal);
+            }
+          }
           table.batch(actions);
           return null;
         }
@@ -238,6 +258,7 @@ public class HBaseSink extends AbstractSink implements 
Configurable {
         @Override
         public Void run() throws Exception {
           for (final Increment i : incs) {
+            i.setWriteToWAL(enableWal);
             table.increment(i);
           }
           return null;

http://git-wip-us.apache.org/repos/asf/flume/blob/36c4d2fd/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index fb6bd4e..7fdc75b 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -49,6 +49,10 @@ public class HBaseSinkConfigurationConstants {
 
   public static final String CONFIG_TIMEOUT = "timeout";
 
+  public static final String CONFIG_ENABLE_WAL = "enableWal";
+
+  public static final boolean DEFAULT_ENABLE_WAL = true;
+
   public static final long DEFAULT_TIMEOUT = 60000;
 
   public static final String CONFIG_KEYTAB = "kerberosKeytab";

Reply via email to