[
https://issues.apache.org/jira/browse/APEXCORE-718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007749#comment-16007749
]
ROHIT RAJKUMAR GARG commented on APEXCORE-718:
----------------------------------------------
Thanks [~tushargosavi]
I will use the email id provided .We even tried as per the solution mentioned
..Please find the code
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortStringInputOperator kafkaReader =
dag.addOperator("MessageReader", new KafkaSinglePortStringInputOperator());
IPVersionKeyValueCreator ipversionKeyValueCreator =
dag.addOperator("ipversionKeyValueCreator",new
IPVersionKeyValueCreator());
SumKeyVal<String, Long> ipversionSumOperator =
getSumKeyValOperator("ipversionSum",dag);
IPVersionMapCreator ipversionMapCreator =
dag.addOperator("ipversionMapCreator",new IPVersionMapCreator());
//Operators for HTTP Response Code Count
HttpResponseCodeKeyValueCreator httpResponseCodeKeyValueCreator =
dag.addOperator("httpResponseCodeKeyValueCreator",new
HttpResponseCodeKeyValueCreator());
SumKeyVal<String, Long> httpResponseCodeSumOperator =
getSumKeyValOperator("httpResponseCodeSum",dag);
HttpResponseCodeMapCreator httpResponseCodeMapCreator =
dag.addOperator("httpResponseCodeMapCreator",new HttpResponseCodeMapCreator());
ElasticSearchMapOutputOperator esOutput=
dag.addOperator("elasticSearchOperator", new ElasticSearchMapOutputOperator());
try {
esOutput.setStore(createStore());
} catch (IOException e) {
e.printStackTrace();
}
esOutput.setIndexName("test_apex_kafka_es");
esOutput.setType("test_apex_es");
System.out.println("Wrting to ES");
dag.addStream("kafkaData3", kafkaReader.outputPort ,
ipversionKeyValueCreator.input,
httpResponseCodeKeyValueCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("JsonData3", ipversionKeyValueCreator.output ,
ipversionSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("ConsumptionData3", ipversionSumOperator.sum ,
ipversionMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("MapData3", ipversionMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
//Streams for HTTP Response Code Count
//dag.addStream("kafkaData4", kafkaReader.outputPort ,
httpResponseCodeKeyValueCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("JsonData4", httpResponseCodeKeyValueCreator.output ,
httpResponseCodeSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("ConsumptionData4", httpResponseCodeSumOperator.sum ,
httpResponseCodeMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("MapData4", httpResponseCodeMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
--------------========================================================================================================
java.lang.IllegalArgumentException: Port input already connected to stream
LogicalPlan.StreamMeta[id=MapData3]
at
com.datatorrent.stram.plan.logical.LogicalPlan$StreamMeta.addSink(LogicalPlan.java:548)
at
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1429)
at
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1480)
at
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:125)
at org.jio.media.Application.populateDAG(Application.java:99)
at
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.prepareDAG(LogicalPlanConfiguration.java:2263)
at
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.createFromStreamingApplication(LogicalPlanConfiguration.java:2123)
at
org.apache.apex.engine.util.StreamingAppFactory.createApp(StreamingAppFactory.java:47)
at
com.datatorrent.stram.client.StramAppLauncher$1.createApp(StramAppLauncher.java:473)
at
com.datatorrent.stram.client.StramAppLauncher.runLocal(StramAppLauncher.java:518)
at
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2143)
at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3561)
at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:153)
at
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1980)
at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1539)
> How to Use one Kafka input operator and Feed to multiple operator
> ------------------------------------------------------------------
>
> Key: APEXCORE-718
> URL: https://issues.apache.org/jira/browse/APEXCORE-718
> Project: Apache Apex Core
> Issue Type: Question
> Reporter: ROHIT RAJKUMAR GARG
>
> I want to read from a topic using the kafka input operator and then feed it
> to multiple operator . in short it will be a sort of branch
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)