Hello,

As a part of integration with other open source streaming platforms,
ability to run Storm topology would be a good addition to Apex.

Following JIRA is open to track the idea
https://issues.apache.org/jira/browse/APEXMALHAR-1996

We could make progress in phases.

Phase 1 :
Embedding Storm Spout/Bolt as operators in Apex.

User would implement the populateDag method and embed Storm Spout/Bolt into
the operator.
Here's the example :
For a word count topology in Storm
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new WordSpout(), 5);
    builder.setBolt("split", new BoltTokenizer(),
8).shuffleGrouping("spout");
    builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split",
new Fields("word"));

a corresponding Apex application would look like this :

    SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new
WordSpout(), "spout"));
    BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new
BoltTokenizer(), "split"));
    BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new
BoltCounter(), "count"));
    dag.addStream("input", input.output, tokens.input);
    dag.addStream("word-count", tokens.output, counter.input);

For stream groupings, we could provide concrete implementation of
StreamCodecs

In the above case to specify "fieldsGrouping("split", new Fields("word"));"
we could have
dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
StormTupleStreamCodec(new int[] { 0 }));

Phase 2:  Ability to execute whole Storm topology in Apex.
Here, user could submit the storm topoplogy AS-IS and we will provide a
utility/ higher level API to generate Apex Dag from Storm topology.

To validate the first phase, with a quick implementation of SpoutWrapper ,
BoltWrapper  and StormTupleStreamCodec we were able to run word count
application with partitioning.
You may have a look at the code here :
https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm


Word Count application involved single streams between operators and hence
was fairly easy to implement.
Storm Spout/ bolt can emit more than one stream. To do so user declares
multiple streams using the declareStream method of OutputFieldsDeclarer and
specifies the stream to emit to when using the emit method on
OutputCollector.
Example : https://gist.github.com/Xorlev/8058947

To embed such bolts/spouts within an operator , we would require some
support  to be added in Apex.
One such feature could be ability to declare input/output ports
dynamically. There was a mail thread for supporting collection based ports.
This will help in our case too.

Please let me know your thoughts.

Thanks,
Shubham

Reply via email to