Repository: storm
Updated Branches:
  refs/heads/master ebca03a90 -> 8f8c3e548


STORM-1079. Batch Puts to HBase.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e3b5c270
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e3b5c270
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e3b5c270

Branch: refs/heads/master
Commit: e3b5c2701f1ce912dde052af3e81b9c4ce517f3f
Parents: ce93d5f
Author: Sriharsha Chintalapani <m...@harsha.io>
Authored: Wed Sep 30 14:44:28 2015 -0700
Committer: Sriharsha Chintalapani <m...@harsha.io>
Committed: Wed Sep 30 14:44:28 2015 -0700

----------------------------------------------------------------------
 .../storm/hbase/bolt/AbstractHBaseBolt.java     | 10 +++
 .../org/apache/storm/hbase/bolt/HBaseBolt.java  | 65 +++++++++++++++++---
 2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e3b5c270/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index d814117..aad3d88 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -42,6 +42,8 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
     protected String tableName;
     protected HBaseMapper mapper;
     protected String configKey;
+    protected int batchSize = 15000;
+    protected int flushIntervalSecs = 0;
 
     public AbstractHBaseBolt(String tableName, HBaseMapper mapper) {
         Validate.notEmpty(tableName, "Table name can not be blank or null");
@@ -72,5 +74,13 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt 
{
         Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
         hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, 
map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
         this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
+
+         // If interval is non-zero then it has already been explicitly set 
and we should not default it
+        if (conf.containsKey("topology.message.timeout.secs") && 
flushIntervalSecs == 0)
+        {
+            Integer topologyTimeout = 
Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
+            flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2));
+            LOG.info("Setting tick tuple interval to [" + flushIntervalSecs + 
"] based on topology timeout");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3b5c270/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index cf29aa5..7eeca77 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -19,6 +19,8 @@ package org.apache.storm.hbase.bolt;
 
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import backtype.storm.Config;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
@@ -26,7 +28,9 @@ import org.apache.storm.hbase.common.ColumnList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.List;
+import java.util.LinkedList;
 
 /**
  * Basic bolt for writing to HBase.
@@ -38,9 +42,13 @@ public class HBaseBolt  extends AbstractHBaseBolt {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class);
 
     boolean writeToWAL = true;
+    List<Mutation> batchMutations;
+    List<Tuple> tupleBatch;
 
     public HBaseBolt(String tableName, HBaseMapper mapper) {
         super(tableName, mapper);
+        this.batchMutations = new LinkedList<>();
+        this.tupleBatch = new LinkedList<>();
     }
 
     public HBaseBolt writeToWAL(boolean writeToWAL) {
@@ -53,21 +61,62 @@ public class HBaseBolt  extends AbstractHBaseBolt {
         return this;
     }
 
+    public HBaseBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
+        this.flushIntervalSecs = flushIntervalSecs;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = super.getComponentConfiguration();
+        if (conf == null)
+            conf = new Config();
+
+        if (flushIntervalSecs > 0) {
+            LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs 
+ "]");
+            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
+        }
+
+        return conf;
+    }
+
+
     @Override
     public void execute(Tuple tuple) {
-        byte[] rowKey = this.mapper.rowKey(tuple);
-        ColumnList cols = this.mapper.columns(tuple);
-        List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, 
cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
+        boolean flush = false;
+        if (TupleUtils.isTick(tuple)) {
+            LOG.debug("TICK received! current batch status [" + 
tupleBatch.size() + "/" + batchSize + "]");
+            flush = true;
+        } else {
+            byte[] rowKey = this.mapper.rowKey(tuple);
+            ColumnList cols = this.mapper.columns(tuple);
+            List<Mutation> mutations = 
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL 
: Durability.SKIP_WAL);
+            batchMutations.addAll(mutations);
+            tupleBatch.add(tuple);
+            if (tupleBatch.size() >= batchSize)
+                flush = true;
+        }
 
         try {
-            this.hBaseClient.batchMutate(mutations);
+            if (flush && !tupleBatch.isEmpty()) {
+                this.hBaseClient.batchMutate(batchMutations);
+                LOG.debug("acknowledging tuples after batchMutate");
+                for(Tuple t : tupleBatch)
+                    collector.ack(t);
+            }
         } catch(Exception e){
             this.collector.reportError(e);
-            this.collector.fail(tuple);
-            return;
+            for (Tuple t : tupleBatch)
+                collector.fail(t);
+        } finally {
+            tupleBatch.clear();
+            batchMutations.clear();
         }
-
-        this.collector.ack(tuple);
     }
 
     @Override

Reply via email to