Repository: storm Updated Branches: refs/heads/master ebca03a90 -> 8f8c3e548
STORM-1079. Batch Puts to HBase. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e3b5c270 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e3b5c270 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e3b5c270 Branch: refs/heads/master Commit: e3b5c2701f1ce912dde052af3e81b9c4ce517f3f Parents: ce93d5f Author: Sriharsha Chintalapani <m...@harsha.io> Authored: Wed Sep 30 14:44:28 2015 -0700 Committer: Sriharsha Chintalapani <m...@harsha.io> Committed: Wed Sep 30 14:44:28 2015 -0700 ---------------------------------------------------------------------- .../storm/hbase/bolt/AbstractHBaseBolt.java | 10 +++ .../org/apache/storm/hbase/bolt/HBaseBolt.java | 65 +++++++++++++++++--- 2 files changed, 67 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e3b5c270/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java index d814117..aad3d88 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java @@ -42,6 +42,8 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt { protected String tableName; protected HBaseMapper mapper; protected String configKey; + protected int batchSize = 15000; + protected int flushIntervalSecs = 0; public AbstractHBaseBolt(String tableName, HBaseMapper mapper) { Validate.notEmpty(tableName, "Table name can not be blank or null"); @@ -72,5 +74,13 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt { Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf); hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS)); this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName); + + // If interval is non-zero then it has already been explicitly set and we should not default it + if (conf.containsKey("topology.message.timeout.secs") && flushIntervalSecs == 0) + { + Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString()); + flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2)); + LOG.info("Setting tick tuple interval to [" + flushIntervalSecs + "] based on topology timeout"); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/e3b5c270/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java index cf29aa5..7eeca77 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java @@ -19,6 +19,8 @@ package org.apache.storm.hbase.bolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import backtype.storm.Config; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; @@ -26,7 +28,9 @@ import org.apache.storm.hbase.common.ColumnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.List; +import java.util.LinkedList; /** * Basic bolt for writing to HBase. @@ -38,9 +42,13 @@ public class HBaseBolt extends AbstractHBaseBolt { private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class); boolean writeToWAL = true; + List<Mutation> batchMutations; + List<Tuple> tupleBatch; public HBaseBolt(String tableName, HBaseMapper mapper) { super(tableName, mapper); + this.batchMutations = new LinkedList<>(); + this.tupleBatch = new LinkedList<>(); } public HBaseBolt writeToWAL(boolean writeToWAL) { @@ -53,21 +61,62 @@ public class HBaseBolt extends AbstractHBaseBolt { return this; } + public HBaseBolt withBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) { + this.flushIntervalSecs = flushIntervalSecs; + return this; + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = super.getComponentConfiguration(); + if (conf == null) + conf = new Config(); + + if (flushIntervalSecs > 0) { + LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]"); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); + } + + return conf; + } + + @Override public void execute(Tuple tuple) { - byte[] rowKey = this.mapper.rowKey(tuple); - ColumnList cols = this.mapper.columns(tuple); - List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); + boolean flush = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]"); + flush = true; + } else { + byte[] rowKey = this.mapper.rowKey(tuple); + ColumnList cols = this.mapper.columns(tuple); + List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); + batchMutations.addAll(mutations); + tupleBatch.add(tuple); + if (tupleBatch.size() >= batchSize) + flush = true; + } try { - this.hBaseClient.batchMutate(mutations); + if (flush && !tupleBatch.isEmpty()) { + this.hBaseClient.batchMutate(batchMutations); + LOG.debug("acknowledging tuples after batchMutate"); + for(Tuple t : tupleBatch) + collector.ack(t); + } } catch(Exception e){ this.collector.reportError(e); - this.collector.fail(tuple); - return; + for (Tuple t : tupleBatch) + collector.fail(t); + } finally { + tupleBatch.clear(); + batchMutations.clear(); } - - this.collector.ack(tuple); } @Override