Hi, Thanks for the reply.

I tried to consume from two different topics in same app , I am getting
error (*java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;*)
. 

When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator)
operator, I was able to do it successfully , but when I am adding another
one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8 
in same dag I am getting the error.

For testing I am not merging kafka output to any operator, it is writing at
two different location in HDFS.

Looks like there is some version issue comming , which I am not able to
identify . Any help is highly appreciated.

My pom.xml looks like this=

<properties>
    
    <apex.version>3.4.0</apex.version>
    <malhar.version>3.6.0</malhar.version>
    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
    <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
        <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
        <kafka.version>0.9.0.1</kafka.version>
         <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
        <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
      
        <avro.version>1.7.7</avro.version>
        <json.version>1.1</json.version>
        <jodatime.version>2.9.1</jodatime.version>
        <kyroserializer.version>0.38</kyroserializer.version>
        <junit.version>4.10</junit.version>
  </properties>
  
  <repositories>
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>
           
<url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>HDP Jetty Hadoop</id>
            <name>HDP Jetty Hadoop</name>
           
<url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven</url>
        </repository>
    </repositories>
        <dependencies>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-library</artifactId>
            <version>${malhar.version}</version>
            
            
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-common</artifactId>
            <version>${apex.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-engine</artifactId>
            <version>${apex.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-contrib</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-kafka</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${confluent.kafka.version}</version>
        </dependency>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${kafka.avro.srlzr.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${jodatime.version}</version>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>${kyroserializer.version}</version>
        </dependency>
    </dependencies>


My DAG looks like this=>

public void populateDAG(DAG dag, Configuration conf)
  {

    KafkaSinglePortInputOperator kafkaInTtce =
dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());
   
kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount")));
    kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
    kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
    kafkaInTtce.setClusters(conf.get("kafka.cluster"));
   
kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"),
conf));
    kafkaInTtce.setStrategy(conf.get("kafka.strategy"));

    AvroBytesConversionOperator avroConversion =
dag.addOperator("Avro_Convert", new
AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
    ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract",
new ColumnsExtractOperator());

    WriteToHdfs hdfs = dag.addOperator("To_HDFS", new
WriteToHdfs(conf.get("hdfs.filename")));
    hdfs.setMaxLength(268435456); // new file rotates after every 256mb
    
    dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort,
avroConversion.input);
    dag.addStream("jsonstring_stream", avroConversion.output,
fieldExtract.input);
    dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output,
hdfs.input);
    
    KafkaSinglePortByteArrayInputOperator kafkaInput = 
dag.addOperator("Kafka_Input_NonSSL", new
KafkaSinglePortByteArrayInputOperator());
    CopyofAvroBytesConversionOperator avroConversionEstore =
dag.addOperator("Avro_Convert_estore", new
CopyofAvroBytesConversionOperator("http://--------";));
    CopyOfColumnsExtractOperator fieldExtractEstore =
dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator());
    WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new
WriteToHdfs2("DeviceTypeEstore"));
    hdfs2.setMaxLength(268435456);
    
    dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort,
avroConversionEstore.input);
    dag.addStream("jsonstring_stream_estore", avroConversionEstore.output,
fieldExtractEstore.input);
    dag.addStream("valid_recs_into_hdfs_estorestream",
fieldExtractEstore.output, hdfs2.input);
  }


Error I am getting(.dt log)=>

2017-08-01 04:57:38,281 INFO  stram.StreamingAppMaster
(StreamingAppMaster.java:main(99)) - Initializing Application Master.
2017-08-01 04:57:38,388 INFO  stram.StreamingAppMasterService
(StreamingAppMasterService.java:serviceInit(537)) - Application master,
appId=507386, clustertimestamp=1500406884031, attemptId=2
2017-08-01 04:57:38,622 WARN  util.NativeCodeLoader
(NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2017-08-01 04:57:39,441 WARN  shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:<init>(117)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
2017-08-01 04:57:40,088 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize
Partitioner
2017-08-01 04:57:40,089 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner
is class org.apache.apex.malhar.kafka.OneToManyPartitioner
2017-08-01 04:57:40,121 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer
Properties :  #
#Tue Aug 01 04:57:40 CDT 2017
security.protocol=SSL
enable.auto.commit=false
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
ssl.keystore.password=
ssl.truststore.location=/home_dir/client.truststore.jks
bootstrap.servers=
ssl.truststore.password=
ssl.keystore.location=/home_dir/server.keystore.jks
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2017-08-01 04:57:40,290 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2017-08-01 04:57:40,291 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
2017-08-01 04:57:41,306 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change
detected: 
2017-08-01 04:57:41,307 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-26} 
2017-08-01 04:57:41,318 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-28} 
2017-08-01 04:57:41,322 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-12} 
2017-08-01 04:57:41,365 INFO  zkclient.ZkEventThread
(ZkEventThread.java:run(64)) - Starting ZkClient event thread.
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:host.name=brdn1351.target.com
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle
Corporation
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:java.home=/usr/java/jdk1.8.0_73/jre
connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181
sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
2017-08-01 04:57:41,388 INFO  zkclient.ZkClient
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state
SyncConnected
2017-08-01 04:57:41,392 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server
10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-08-01 04:57:41,393 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:primeConnection(852)) - Socket connection established to
10.66.137.97/10.66.137.97:2181, initiating session
2017-08-01 04:57:41,445 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:onConnected(1235)) - Session establishment complete on
server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1,
negotiated timeout = 30000
2017-08-01 04:57:41,447 INFO  zkclient.ZkClient
(ZkClient.java:processStateChanged(711)) - zookeeper state changed
(SyncConnected)
2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
        at
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117)
        at
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139)
        at
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
        at
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
        at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)





--
View this message in context: 
http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797p1801.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Reply via email to