dag.addStream("MapData3", ipversionMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("MapData4", httpResponseCodeMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
Your esOutput.input port is being connected to two upstream operators which is not possible (reason : breaks idempotency). Your esOutput operator needs to have 2 input ports to receive data from 2 input operators or use an additional operator which unifies the two upstreams. Thanks, Ajay Copy Pasting issue for reference : public void populateDAG(DAG dag, Configuration conf) { KafkaSinglePortStringInputOperator kafkaReader = dag.addOperator("MessageReader", new KafkaSinglePortStringInputOperator()); IPVersionKeyValueCreator ipversionKeyValueCreator = dag.addOperator(" ipversionKeyValueCreator",new IPVersionKeyValueCreator()); SumKeyVal<String, Long> ipversionSumOperator = getSumKeyValOperator("ipversionSum",dag); IPVersionMapCreator ipversionMapCreator = dag.addOperator("ipversionMapCreator",new IPVersionMapCreator()); //Operators for HTTP Response Code Count HttpResponseCodeKeyValueCreator httpResponseCodeKeyValueCreator = dag.addOperator("httpResponseCodeKeyValueCreator",new HttpResponseCodeKeyValueCreator()); SumKeyVal<String, Long> httpResponseCodeSumOperator = getSumKeyValOperator("httpResponseCodeSum",dag); HttpResponseCodeMapCreator httpResponseCodeMapCreator = dag.addOperator("httpResponseCodeMapCreator",new HttpResponseCodeMapCreator()); ElasticSearchMapOutputOperator esOutput= dag.addOperator("elasticSearchOperator", new ElasticSearchMapOutputOperator()); try { esOutput.setStore(createStore()); } catch (IOException e) { e.printStackTrace(); } esOutput.setIndexName("test_apex_kafka_es"); esOutput.setType("test_apex_es"); System.out.println("Wrting to ES"); dag.addStream("kafkaData3", kafkaReader.outputPort , ipversionKeyValueCreator.input, httpResponseCodeKeyValueCreato r.input).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("JsonData3", ipversionKeyValueCreator.output , ipversionSumOperator.data).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("ConsumptionData3", ipversionSumOperator.sum , ipversionMapCreator.input).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("MapData3", ipversionMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL); //Streams for HTTP Response Code Count //dag.addStream("kafkaData4", kafkaReader.outputPort , httpResponseCodeKeyValueCreator.input).setLocality(Locality. CONTAINER_LOCAL); dag.addStream("JsonData4", httpResponseCodeKeyValueCreator.output , httpResponseCodeSumOperator.data).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("ConsumptionData4", httpResponseCodeSumOperator.sum , httpResponseCodeMapCreator.input).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("MapData4", httpResponseCodeMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL); } --------------============================================== ========================================================== java.lang.IllegalArgumentException: Port input already connected to stream LogicalPlan.StreamMeta[id=MapData3] at com.datatorrent.stram.plan.logical.LogicalPlan$ StreamMeta.addSink(LogicalPlan.java:548) at com.datatorrent.stram.plan.logical.LogicalPlan.addStream( LogicalPlan.java:1429) at com.datatorrent.stram.plan.logical.LogicalPlan.addStream( LogicalPlan.java:1480) at com.datatorrent.stram.plan.logical.LogicalPlan.addStream( LogicalPlan.java:125) at org.jio.media.Application.populateDAG(Application.java:99) On Sat, May 13, 2017 at 10:55 AM, rohit garg <rohit.gar...@gmail.com> wrote: > > Did any one got a chance to look at it .. > > -- > > > ---------------RohitGarg. > > > > >