Hello, As a part of integration with other open source streaming platforms, ability to run Storm topology would be a good addition to Apex.
Following JIRA is open to track the idea https://issues.apache.org/jira/browse/APEXMALHAR-1996 We could make progress in phases. Phase 1 : Embedding Storm Spout/Bolt as operators in Apex. User would implement the populateDag method and embed Storm Spout/Bolt into the operator. Here's the example : For a word count topology in Storm TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new WordSpout(), 5); builder.setBolt("split", new BoltTokenizer(), 8).shuffleGrouping("spout"); builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split", new Fields("word")); a corresponding Apex application would look like this : SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new WordSpout(), "spout")); BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new BoltTokenizer(), "split")); BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new BoltCounter(), "count")); dag.addStream("input", input.output, tokens.input); dag.addStream("word-count", tokens.output, counter.input); For stream groupings, we could provide concrete implementation of StreamCodecs In the above case to specify "fieldsGrouping("split", new Fields("word"));" we could have dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new StormTupleStreamCodec(new int[] { 0 })); Phase 2: Ability to execute whole Storm topology in Apex. Here, user could submit the storm topoplogy AS-IS and we will provide a utility/ higher level API to generate Apex Dag from Storm topology. To validate the first phase, with a quick implementation of SpoutWrapper , BoltWrapper and StormTupleStreamCodec we were able to run word count application with partitioning. You may have a look at the code here : https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm Word Count application involved single streams between operators and hence was fairly easy to implement. Storm Spout/ bolt can emit more than one stream. To do so user declares multiple streams using the declareStream method of OutputFieldsDeclarer and specifies the stream to emit to when using the emit method on OutputCollector. Example : https://gist.github.com/Xorlev/8058947 To embed such bolts/spouts within an operator , we would require some support to be added in Apex. One such feature could be ability to declare input/output ports dynamically. There was a mail thread for supporting collection based ports. This will help in our case too. Please let me know your thoughts. Thanks, Shubham
