Hello Team,
As we were talking about High Level api in another thread, here is a small
variation proposal for our existing apis,
Take look at the current code and the proposed code
Current Style
---------------------------
KafkaSinglePortStringInputOperator kafkaInput =
dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
DeserializeJSON deserializeJSON = dag.addOperator("deserialize", new
DeserializeJSON());
RedisJoin redisJoin = dag.addOperator("redisJoin", new RedisJoin());
CampaignProcessorWithApexWindow campaignProcessor =
dag.addOperator("campaignProcessor", new CampaignProcessorWithApexWindow());
// Connect the Ports in the Operators
dag.addStream("deserialize", kafkaInput.outputPort,
deserializeJSON.input) ;
dag.addStream("redisJoin", filterFields.output, redisJoin.input) ;
dag.addStream("output", redisJoin.output, campaignProcessor.input);
-------------------------------------
Proposed Change ( Just create Operators and connect the output port to
input ports and then add the input operators to DAG )
-------------------------------------------
KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator =
new KafkaSinglePortStringInputOperator() ;
DeserializeJSON deserializeJSON = new DeserializeJSON();
RedisJoin redisJoin = new RedisJoin() ;
CampaignProcessor campaignProcessor = new CampaignProcessor() ;
kafkaInput.outputPort.connect(deserializeJSON.input);
filterFields.output.connect(redisJoin.input) ;
redisJoin.output.connect(campaignProcessor.input) ;
dag.add(kafkaInput);
-----------------------------------------
Things to note : Name can be generated internally or user can specify a
name property of the operator ( like Dag.AddOperator("name", ...) )
Let me know your thoughts.
Thanks