STORM-1079. Batch Puts to HBase. Add default flushIntervalSecs.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a25ab79 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a25ab79 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a25ab79 Branch: refs/heads/master Commit: 9a25ab79b4457c4ed98b64c96218e0462efe72c4 Parents: b53b9eb Author: Sriharsha Chintalapani <m...@harsha.io> Authored: Thu Oct 8 09:13:54 2015 -0700 Committer: Sriharsha Chintalapani <m...@harsha.io> Committed: Thu Oct 8 09:13:54 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/storm/hbase/bolt/HBaseBolt.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a25ab79/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java index 4cdf388..34e2eba 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java @@ -78,11 +78,14 @@ public class HBaseBolt extends AbstractHBaseBolt { conf = new Config(); } - if (flushIntervalSecs > 0) { - LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]"); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); + if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) { + Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString()); + flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2)); + LOG.debug("Setting flush interval to [" + flushIntervalSecs + "] based on topology.message.timeout.secs"); } + LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]"); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); return conf; }