Hello,

Would like to have inputs on this proposal from the community, particularly
from folks who have used Storm before.
For phase 1, apart from challenge of embedding bolt/spout with multiple
streams, do we see any other technical challenge while implementing the
feature ?


Thanks,
Shubham

On Sun, Feb 21, 2016 at 11:19 PM, Shubham Pathak <[email protected]>
wrote:

> Hello,
>
> As a part of integration with other open source streaming platforms,
> ability to run Storm topology would be a good addition to Apex.
>
> Following JIRA is open to track the idea
> https://issues.apache.org/jira/browse/APEXMALHAR-1996
>
> We could make progress in phases.
>
> Phase 1 :
> Embedding Storm Spout/Bolt as operators in Apex.
>
> User would implement the populateDag method and embed Storm Spout/Bolt
> into the operator.
> Here's the example :
> For a word count topology in Storm
>     TopologyBuilder builder = new TopologyBuilder();
>     builder.setSpout("spout", new WordSpout(), 5);
>     builder.setBolt("split", new BoltTokenizer(),
> 8).shuffleGrouping("spout");
>     builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split",
> new Fields("word"));
>
> a corresponding Apex application would look like this :
>
>     SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new
> WordSpout(), "spout"));
>     BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new
> BoltTokenizer(), "split"));
>     BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new
> BoltCounter(), "count"));
>     dag.addStream("input", input.output, tokens.input);
>     dag.addStream("word-count", tokens.output, counter.input);
>
> For stream groupings, we could provide concrete implementation of
> StreamCodecs
>
> In the above case to specify "fieldsGrouping("split", new
> Fields("word"));" we could have
> dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> StormTupleStreamCodec(new int[] { 0 }));
>
> Phase 2:  Ability to execute whole Storm topology in Apex.
> Here, user could submit the storm topoplogy AS-IS and we will provide a
> utility/ higher level API to generate Apex Dag from Storm topology.
>
> To validate the first phase, with a quick implementation of SpoutWrapper ,
> BoltWrapper  and StormTupleStreamCodec we were able to run word count
> application with partitioning.
> You may have a look at the code here :
>
> https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm
>
>
> Word Count application involved single streams between operators and hence
> was fairly easy to implement.
> Storm Spout/ bolt can emit more than one stream. To do so user declares
> multiple streams using the declareStream method of OutputFieldsDeclarer and
> specifies the stream to emit to when using the emit method on
> OutputCollector.
> Example : https://gist.github.com/Xorlev/8058947
>
> To embed such bolts/spouts within an operator , we would require some
> support  to be added in Apex.
> One such feature could be ability to declare input/output ports
> dynamically. There was a mail thread for supporting collection based
> ports. This will help in our case too.
>
> Please let me know your thoughts.
>
> Thanks,
> Shubham
>

Reply via email to