[ 
https://issues.apache.org/jira/browse/APEXCORE-718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006224#comment-16006224
 ] 

ROHIT RAJKUMAR GARG commented on APEXCORE-718:
----------------------------------------------

  dag.addStream("kafkaData", kafkainput.outputPort , 
consumptionKeyValueCreator.input, 
requestCountKeyValueCreator.input).setLocality(Locality.CONTAINER_LOCAL);
  dag.addStream("JsonData", consumptionKeyValueCreator.output , 
consumptionOperator.data).setLocality(Locality.CONTAINER_LOCAL);
    
  
    dag.addStream("JsonData1", requestCountKeyValueCreator.output , 
countOperator.data).setLocality(Locality.CONTAINER_LOCAL);
    dag.addStream("ConsumptionData1", countOperator.sum , 
countMapOutput.input).setLocality(Locality.CONTAINER_LOCAL);
  
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

   java.lang.IllegalArgumentException: Port input already connected to stream 
LogicalPlan.StreamMeta[id=MapData]
        at 
com.datatorrent.stram.plan.logical.LogicalPlan$StreamMeta.addSink(LogicalPlan.java:548)
        at 
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1429)
        at 
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1480)
        at 
com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:125)
        at org.jio.media.Application.populateDAG(Application.java:68)
        at 
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.prepareDAG(LogicalPlanConfiguration.java:2263)
        at 
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.createFromStreamingApplication(LogicalPlanConfiguration.java:2123)
        at 
org.apache.apex.engine.util.StreamingAppFactory.createApp(StreamingAppFactory.java:47)
        at 
com.datatorrent.stram.client.StramAppLauncher$1.createApp(StramAppLauncher.java:473)
        at 
com.datatorrent.stram.client.StramAppLauncher.runLocal(StramAppLauncher.java:518)
        at 
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2143)
        at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3561)
        at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:153)
        at 
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1980)
        at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1539)




> How to Use one Kafka input operator and Feed to multiple operator 
> ------------------------------------------------------------------
>
>                 Key: APEXCORE-718
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-718
>             Project: Apache Apex Core
>          Issue Type: Question
>            Reporter: ROHIT RAJKUMAR GARG
>
> I want to read from a topic using the kafka input operator and then feed it 
> to multiple operator . in short it will be a sort of branch 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to