Repository: storm Updated Branches: refs/heads/1.x-branch 97e7d25fe -> 11f23153a
STORM-1669: Fix SolrUpdateBolt flush bug Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81997ece Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81997ece Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81997ece Branch: refs/heads/1.x-branch Commit: 81997ece6422d2b80ef02b86521772276debc03f Parents: 652d2f6 Author: Xin Wang <[email protected]> Authored: Thu Mar 31 12:30:35 2016 +0800 Committer: vesense <[email protected]> Committed: Thu Mar 31 12:37:22 2016 +0800 ---------------------------------------------------------------------- .../apache/storm/solr/bolt/SolrUpdateBolt.java | 23 ++++++++------------ 1 file changed, 9 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/81997ece/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java index cab9899..ff0a96e 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java @@ -18,14 +18,12 @@ package org.apache.storm.solr.bolt; -import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; -import org.apache.storm.utils.Utils; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; @@ -45,6 +43,11 @@ import java.util.Map; public class SolrUpdateBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(SolrUpdateBolt.class); + /** + * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + */ + private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; + private final SolrConfig solrConfig; private final SolrMapper solrMapper; private final SolrCommitStrategy commitStgy; // if null, acks every tuple @@ -52,7 +55,7 @@ public class SolrUpdateBolt extends BaseRichBolt { private SolrClient solrClient; private OutputCollector collector; private List<Tuple> toCommitTuples; - private int tickTupleInterval; + private int tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS; public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) { this(solrConfig, solrMapper, null); @@ -72,17 +75,6 @@ public class SolrUpdateBolt extends BaseRichBolt { this.collector = collector; this.solrClient = new CloudSolrClient(solrConfig.getZkHostString()); this.toCommitTuples = new ArrayList<>(capacity()); - - setTickTupleInterval(stormConf); - } - - private void setTickTupleInterval(Map stormConf) { - this.tickTupleInterval = solrConfig.getTickTupleInterval(); - if(stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) && tickTupleInterval == 0) { - Integer topologyTimeout = Utils.getInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); - tickTupleInterval = (int)(Math.floor(topologyTimeout / 2)); - LOG.debug("Setting tick tuple interval to [{}] based on topology timeout", tickTupleInterval); - } } private int capacity() { @@ -153,6 +145,9 @@ public class SolrUpdateBolt extends BaseRichBolt { @Override public Map<String, Object> getComponentConfiguration() { + if (solrConfig.getTickTupleInterval() > 0) { + this.tickTupleInterval = solrConfig.getTickTupleInterval(); + } return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval); }
