Hi, Thanks for the reply. I tried to consume from two different topics in same app , I am getting error (*java.lang.NoSuchMethodError: kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;*) .
When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator) operator, I was able to do it successfully , but when I am adding another one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8 in same dag I am getting the error. For testing I am not merging kafka output to any operator, it is writing at two different location in HDFS. Looks like there is some version issue comming , which I am not able to identify . Any help is highly appreciated. My pom.xml looks like this= <properties> <apex.version>3.4.0</apex.version> <malhar.version>3.6.0</malhar.version> <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version> <hbase.version>1.1.2.2.3.4.0-3485</hbase.version> <kafka.version>0.9.0.1</kafka.version> <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version> <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version> <avro.version>1.7.7</avro.version> <json.version>1.1</json.version> <jodatime.version>2.9.1</jodatime.version> <kyroserializer.version>0.38</kyroserializer.version> <junit.version>4.10</junit.version> </properties> <repositories> <repository> <id>HDPReleases</id> <name>HDP Releases</name> <url>http://repo.hortonworks.com/content/repositories/releases/</url> <layout>default</layout> </repository> <repository> <id>HDP Jetty Hadoop</id> <name>HDP Jetty Hadoop</name> <url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url> <layout>default</layout> </repository> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-library</artifactId> <version>${malhar.version}</version> </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-common</artifactId> <version>${apex.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> <version>${apex.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-contrib</artifactId> <version>${malhar.version}</version> </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-kafka</artifactId> <version>${malhar.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${confluent.kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>${kafka.avro.srlzr.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>${json.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${jodatime.version}</version> </dependency> <dependency> <groupId>de.javakaffee</groupId> <artifactId>kryo-serializers</artifactId> <version>${kyroserializer.version}</version> </dependency> </dependencies> My DAG looks like this=> public void populateDAG(DAG dag, Configuration conf) { KafkaSinglePortInputOperator kafkaInTtce = dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator()); kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount"))); kafkaInTtce.setTopics(conf.get("kafka.ssl.topic")); kafkaInTtce.setInitialOffset(conf.get("kafka.offset")); kafkaInTtce.setClusters(conf.get("kafka.cluster")); kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"), conf)); kafkaInTtce.setStrategy(conf.get("kafka.strategy")); AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(conf.get("kafka.schema.registry"))); ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract", new ColumnsExtractOperator()); WriteToHdfs hdfs = dag.addOperator("To_HDFS", new WriteToHdfs(conf.get("hdfs.filename"))); hdfs.setMaxLength(268435456); // new file rotates after every 256mb dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort, avroConversion.input); dag.addStream("jsonstring_stream", avroConversion.output, fieldExtract.input); dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output, hdfs.input); KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input_NonSSL", new KafkaSinglePortByteArrayInputOperator()); CopyofAvroBytesConversionOperator avroConversionEstore = dag.addOperator("Avro_Convert_estore", new CopyofAvroBytesConversionOperator("http://--------")); CopyOfColumnsExtractOperator fieldExtractEstore = dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator()); WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new WriteToHdfs2("DeviceTypeEstore")); hdfs2.setMaxLength(268435456); dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort, avroConversionEstore.input); dag.addStream("jsonstring_stream_estore", avroConversionEstore.output, fieldExtractEstore.input); dag.addStream("valid_recs_into_hdfs_estorestream", fieldExtractEstore.output, hdfs2.input); } Error I am getting(.dt log)=> 2017-08-01 04:57:38,281 INFO stram.StreamingAppMaster (StreamingAppMaster.java:main(99)) - Initializing Application Master. 2017-08-01 04:57:38,388 INFO stram.StreamingAppMasterService (StreamingAppMasterService.java:serviceInit(537)) - Application master, appId=507386, clustertimestamp=1500406884031, attemptId=2 2017-08-01 04:57:38,622 WARN util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-08-01 04:57:39,441 WARN shortcircuit.DomainSocketFactory (DomainSocketFactory.java:<init>(117)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2017-08-01 04:57:40,088 INFO kafka.AbstractKafkaInputOperator (AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize Partitioner 2017-08-01 04:57:40,089 INFO kafka.AbstractKafkaInputOperator (AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner is class org.apache.apex.malhar.kafka.OneToManyPartitioner 2017-08-01 04:57:40,121 INFO kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer Properties : # #Tue Aug 01 04:57:40 CDT 2017 security.protocol=SSL enable.auto.commit=false value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP ssl.keystore.password= ssl.truststore.location=/home_dir/client.truststore.jks bootstrap.servers= ssl.truststore.password= ssl.keystore.location=/home_dir/server.keystore.jks key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer 2017-08-01 04:57:40,290 INFO utils.AppInfoParser (AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0 2017-08-01 04:57:40,291 INFO utils.AppInfoParser (AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a 2017-08-01 04:57:41,306 INFO kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change detected: 2017-08-01 04:57:41,307 INFO kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-26} 2017-08-01 04:57:41,318 INFO kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-28} 2017-08-01 04:57:41,322 INFO kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-12} 2017-08-01 04:57:41,365 INFO zkclient.ZkEventThread (ZkEventThread.java:run(64)) - Starting ZkClient event thread. 2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=brdn1351.target.com 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle Corporation 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=/usr/java/jdk1.8.0_73/jre connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553 2017-08-01 04:57:41,388 INFO zkclient.ZkClient (ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state SyncConnected 2017-08-01 04:57:41,392 INFO zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server 10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL (unknown error) 2017-08-01 04:57:41,393 INFO zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to 10.66.137.97/10.66.137.97:2181, initiating session 2017-08-01 04:57:41,445 INFO zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1235)) - Session establishment complete on server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1, negotiated timeout = 30000 2017-08-01 04:57:41,447 INFO zkclient.ZkClient (ZkClient.java:processStateChanged(711)) - zookeeper state changed (SyncConnected) 2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster (StreamingAppMaster.java:main(106)) - Exiting Application Master java.lang.NoSuchMethodError: kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq; at com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117) at com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139) at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506) at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752) at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378) at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418) at com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023) at com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102) -- View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797p1801.html Sent from the Apache Apex Users list mailing list archive at Nabble.com.