STORM-1079. Batch Puts to HBase. Add default flushIntervalSecs.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a25ab79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a25ab79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a25ab79

Branch: refs/heads/master
Commit: 9a25ab79b4457c4ed98b64c96218e0462efe72c4
Parents: b53b9eb
Author: Sriharsha Chintalapani <m...@harsha.io>
Authored: Thu Oct 8 09:13:54 2015 -0700
Committer: Sriharsha Chintalapani <m...@harsha.io>
Committed: Thu Oct 8 09:13:54 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hbase/bolt/HBaseBolt.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a25ab79/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index 4cdf388..34e2eba 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -78,11 +78,14 @@ public class HBaseBolt  extends AbstractHBaseBolt {
             conf = new Config();
         }
 
-        if (flushIntervalSecs > 0) {
-            LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs 
+ "]");
-            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
+        if (conf.containsKey("topology.message.timeout.secs") && 
tickTupleInterval == 0) {
+            Integer topologyTimeout = 
Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
+            flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2));
+            LOG.debug("Setting flush interval to [" + flushIntervalSecs + "] 
based on topology.message.timeout.secs");
         }
 
+        LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + 
"]");
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
         return conf;
     }
 

Reply via email to