Hello, Would like to have inputs on this proposal from the community, particularly from folks who have used Storm before. For phase 1, apart from challenge of embedding bolt/spout with multiple streams, do we see any other technical challenge while implementing the feature ?
Thanks, Shubham On Sun, Feb 21, 2016 at 11:19 PM, Shubham Pathak <[email protected]> wrote: > Hello, > > As a part of integration with other open source streaming platforms, > ability to run Storm topology would be a good addition to Apex. > > Following JIRA is open to track the idea > https://issues.apache.org/jira/browse/APEXMALHAR-1996 > > We could make progress in phases. > > Phase 1 : > Embedding Storm Spout/Bolt as operators in Apex. > > User would implement the populateDag method and embed Storm Spout/Bolt > into the operator. > Here's the example : > For a word count topology in Storm > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("spout", new WordSpout(), 5); > builder.setBolt("split", new BoltTokenizer(), > 8).shuffleGrouping("spout"); > builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split", > new Fields("word")); > > a corresponding Apex application would look like this : > > SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new > WordSpout(), "spout")); > BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new > BoltTokenizer(), "split")); > BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new > BoltCounter(), "count")); > dag.addStream("input", input.output, tokens.input); > dag.addStream("word-count", tokens.output, counter.input); > > For stream groupings, we could provide concrete implementation of > StreamCodecs > > In the above case to specify "fieldsGrouping("split", new > Fields("word"));" we could have > dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new > StormTupleStreamCodec(new int[] { 0 })); > > Phase 2: Ability to execute whole Storm topology in Apex. > Here, user could submit the storm topoplogy AS-IS and we will provide a > utility/ higher level API to generate Apex Dag from Storm topology. > > To validate the first phase, with a quick implementation of SpoutWrapper , > BoltWrapper and StormTupleStreamCodec we were able to run word count > application with partitioning. > You may have a look at the code here : > > https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm > > > Word Count application involved single streams between operators and hence > was fairly easy to implement. > Storm Spout/ bolt can emit more than one stream. To do so user declares > multiple streams using the declareStream method of OutputFieldsDeclarer and > specifies the stream to emit to when using the emit method on > OutputCollector. > Example : https://gist.github.com/Xorlev/8058947 > > To embed such bolts/spouts within an operator , we would require some > support to be added in Apex. > One such feature could be ability to declare input/output ports > dynamically. There was a mail thread for supporting collection based > ports. This will help in our case too. > > Please let me know your thoughts. > > Thanks, > Shubham >
