Try using localOrShuffle grouping. Storm will attempt to pass messages directly to the next component within the same JVM, if possible
On Tuesday, May 31, 2016, 林海涛(信息技术部交易云技术研发组) <linhai...@gf.com.cn> wrote: > Hello. > I do test with a simple topology to test the intercommunication latency of > spout/bolt. It’s just emit the current nano timestamp from a spout and > print the time difference when a bolt receive it. > I deploy my storm cluster in my own machine with docker container (one > nimbus, one supervisor), and run the topology in cluster mode. > code as below: > > public class RandomSpout extends BaseRichSpout{ > > SpoutOutputCollector _collector; > > > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > > _collector = collector; > > } > > > > public void nextTuple() { > > Utils.sleep(1000); > > long currentTime = System.nanoTime(); > > _collector.emit(new Values(currentTime)); > > > > } > > > public void declareOutputFields(OutputFieldsDeclarer arg0) { > > // TODO Auto-generated method stub > > arg0.declare(new Fields("value")); > > } > > } > > > public class PrintBolt extends BaseRichBolt{ > > private LogFileWriter _logFile; > > > public void execute(Tuple arg0) { > > // TODO Auto-generated method stub > > long prevTime = arg0.getLong(0); > > long currentTime = System.nanoTime(); > > _logFile.writeLog("cost: " + (currentTime - prevTime)); > > } > > > public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) > { > > // TODO Auto-generated method stub > > try { > > _logFile = new LogFileWriter("StormTest”, this > .getClass().getSimpleName()); > > } catch (Exception e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > } > > > public void declareOutputFields(OutputFieldsDeclarer arg0) { > > // TODO Auto-generated method stub > > arg0.declare(new Fields("value")); > > } > > > } > > > public class Topology > > { > > public static void main( String[] args ) > > { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new RandomSpout(), 1); > > builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout"); > > > > Config conf = new Config(); > > conf.setDebug(false); > > if(args.length > 0){ > > // cluster submit. > > conf.setNumWorkers(2); > > conf.setNumAckers(0); > > try { > > StormSubmitter.submitTopology("stormTest", conf, builder > .createTopology()); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > }else{ > > new LocalCluster().submitTopology("stormTest", conf, builder > .createTopology()); > > } > > } > > } > > > Output is below: > > [2016-05-31 09:13:53]cost: 1960336 > [2016-05-31 09:13:54]cost: 2600239 > [2016-05-31 09:13:55]cost: 3103449 > [2016-05-31 09:13:56]cost: 3206544 > [2016-05-31 09:13:57]cost: 3783647 > [2016-05-31 09:13:58]cost: 3635923 > [2016-05-31 09:13:59]cost: 3887787 > [2016-05-31 09:14:00]cost: 1623692 > [2016-05-31 09:14:01]cost: 2524674 > [2016-05-31 09:14:02]cost: 3383506 > [2016-05-31 09:14:03]cost: 3898478 > [2016-05-31 09:14:04]cost: 2120949 > [2016-05-31 09:14:05]cost: 3756272 > [2016-05-31 09:14:06]cost: 2877997 > [2016-05-31 09:14:07]cost: 3432532 > [2016-05-31 09:14:08]cost: 3638306 > [2016-05-31 09:14:09]cost: 2958907 > [2016-05-31 09:14:10]cost: 2742666 > [2016-05-31 09:14:11]cost: 3024576 > [2016-05-31 09:14:12]cost: 2822562 > [2016-05-31 09:14:13]cost: 2623060 > [2016-05-31 09:14:14]cost: 4045938 > > Obviously, there is a 2ms latency approximately. It seems not good for me. > How can I reduce the latency? > -- Kevin Conaway http://www.linkedin.com/pub/kevin-conaway/7/107/580/ https://github.com/kevinconaway