I don't think you can use both Kafka client 0.8.x and 0.9.x within the same application. The dependencies overlap and will conflict. You can use 0.8 client to talk to 0.9 server but since you want to use SSL that's not possible (security was only added in 0.9).
I have not tried that, but you might be able to to use the Maven shade plugin to shade Apex Kafka operator along with the Kafka client so both versions can coexist within one application. Thomas On Tue, Aug 1, 2017 at 5:42 AM, rishi <rishi.mis...@target.com> wrote: > Hi, Thanks for the reply. > > I tried to consume from two different topics in same app , I am getting > error (*java.lang.NoSuchMethodError: > kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;) > Lscala/collection/Seq;*) > . > > When I tried consuming from kafka 9 using this( > KafkaSinglePortInputOperator) > operator, I was able to do it successfully , but when I am adding another > one more operator(KafkaSinglePortByteArrayInputOperator) to consume from > .8 > in same dag I am getting the error. > > For testing I am not merging kafka output to any operator, it is writing at > two different location in HDFS. > > Looks like there is some version issue comming , which I am not able to > identify . Any help is highly appreciated. > > My pom.xml looks like this= > > <properties> > > <apex.version>3.4.0</apex.version> > <malhar.version>3.6.0</malhar.version> > <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> > <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version> > <hbase.version>1.1.2.2.3.4.0-3485</hbase.version> > <kafka.version>0.9.0.1</kafka.version> > <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version> > <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version> > > <avro.version>1.7.7</avro.version> > <json.version>1.1</json.version> > <jodatime.version>2.9.1</jodatime.version> > <kyroserializer.version>0.38</kyroserializer.version> > <junit.version>4.10</junit.version> > </properties> > > <repositories> > <repository> > <id>HDPReleases</id> > <name>HDP Releases</name> > > <url>http://repo.hortonworks.com/content/repositories/releases/</url> > <layout>default</layout> > </repository> > <repository> > <id>HDP Jetty Hadoop</id> > <name>HDP Jetty Hadoop</name> > > <url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url> > <layout>default</layout> > </repository> > <repository> > <id>confluent</id> > <url>http://packages.confluent.io/maven</url> > </repository> > </repositories> > <dependencies> > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>malhar-library</artifactId> > <version>${malhar.version}</version> > > > </dependency> > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>apex-common</artifactId> > <version>${apex.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>junit</groupId> > <artifactId>junit</artifactId> > <version>${junit.version}</version> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>apex-engine</artifactId> > <version>${apex.version}</version> > <scope>test</scope> > </dependency> > > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>malhar-contrib</artifactId> > <version>${malhar.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>malhar-kafka</artifactId> > <version>${malhar.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.avro</groupId> > <artifactId>avro</artifactId> > <version>${avro.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.11</artifactId> > <version>${confluent.kafka.version}</version> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.1.1</version> > </dependency> > > <dependency> > <groupId>io.confluent</groupId> > <artifactId>kafka-avro-serializer</artifactId> > <version>${kafka.avro.srlzr.version}</version> > <exclusions> > <exclusion> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > </exclusion> > <exclusion> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>com.googlecode.json-simple</groupId> > <artifactId>json-simple</artifactId> > <version>${json.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.hbase</groupId> > <artifactId>hbase-client</artifactId> > <version>${hbase.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>joda-time</groupId> > <artifactId>joda-time</artifactId> > <version>${jodatime.version}</version> > </dependency> > > <dependency> > <groupId>de.javakaffee</groupId> > <artifactId>kryo-serializers</artifactId> > <version>${kyroserializer.version}</version> > </dependency> > </dependencies> > > > My DAG looks like this=> > > public void populateDAG(DAG dag, Configuration conf) > { > > KafkaSinglePortInputOperator kafkaInTtce = > dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator()); > > kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get(" > kafka.partitioncount"))); > kafkaInTtce.setTopics(conf.get("kafka.ssl.topic")); > kafkaInTtce.setInitialOffset(conf.get("kafka.offset")); > kafkaInTtce.setClusters(conf.get("kafka.cluster")); > > kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"), > conf)); > kafkaInTtce.setStrategy(conf.get("kafka.strategy")); > > AvroBytesConversionOperator avroConversion = > dag.addOperator("Avro_Convert", new > AvroBytesConversionOperator(conf.get("kafka.schema.registry"))); > ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract", > new ColumnsExtractOperator()); > > WriteToHdfs hdfs = dag.addOperator("To_HDFS", new > WriteToHdfs(conf.get("hdfs.filename"))); > hdfs.setMaxLength(268435456); // new file rotates after every 256mb > > dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort, > avroConversion.input); > dag.addStream("jsonstring_stream", avroConversion.output, > fieldExtract.input); > dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output, > hdfs.input); > > KafkaSinglePortByteArrayInputOperator kafkaInput = > dag.addOperator("Kafka_Input_NonSSL", new > KafkaSinglePortByteArrayInputOperator()); > CopyofAvroBytesConversionOperator avroConversionEstore = > dag.addOperator("Avro_Convert_estore", new > CopyofAvroBytesConversionOperator("http://--------")); > CopyOfColumnsExtractOperator fieldExtractEstore = > dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator() > ); > WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new > WriteToHdfs2("DeviceTypeEstore")); > hdfs2.setMaxLength(268435456); > > dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort, > avroConversionEstore.input); > dag.addStream("jsonstring_stream_estore", avroConversionEstore.output, > fieldExtractEstore.input); > dag.addStream("valid_recs_into_hdfs_estorestream", > fieldExtractEstore.output, hdfs2.input); > } > > > Error I am getting(.dt log)=> > > 2017-08-01 04:57:38,281 INFO stram.StreamingAppMaster > (StreamingAppMaster.java:main(99)) - Initializing Application Master. > 2017-08-01 04:57:38,388 INFO stram.StreamingAppMasterService > (StreamingAppMasterService.java:serviceInit(537)) - Application master, > appId=507386, clustertimestamp=1500406884031, attemptId=2 > 2017-08-01 04:57:38,622 WARN util.NativeCodeLoader > (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop > library > for your platform... using builtin-java classes where applicable > 2017-08-01 04:57:39,441 WARN shortcircuit.DomainSocketFactory > (DomainSocketFactory.java:<init>(117)) - The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > 2017-08-01 04:57:40,088 INFO kafka.AbstractKafkaInputOperator > (AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize > Partitioner > 2017-08-01 04:57:40,089 INFO kafka.AbstractKafkaInputOperator > (AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual > Partitioner > is class org.apache.apex.malhar.kafka.OneToManyPartitioner > 2017-08-01 04:57:40,121 INFO kafka.AbstractKafkaPartitioner > (AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer > Properties : # > #Tue Aug 01 04:57:40 CDT 2017 > security.protocol=SSL > enable.auto.commit=false > value.deserializer=org.apache.kafka.common.serialization. > ByteArrayDeserializer > group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP > ssl.keystore.password= > ssl.truststore.location=/home_dir/client.truststore.jks > bootstrap.servers= > ssl.truststore.password= > ssl.keystore.location=/home_dir/server.keystore.jks > key.deserializer=org.apache.kafka.common.serialization. > ByteArrayDeserializer > > 2017-08-01 04:57:40,290 INFO utils.AppInfoParser > (AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0 > 2017-08-01 04:57:40,291 INFO utils.AppInfoParser > (AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a > 2017-08-01 04:57:41,306 INFO kafka.AbstractKafkaPartitioner > (AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change > detected: > 2017-08-01 04:57:41,307 INFO kafka.AbstractKafkaPartitioner > (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0 > with assignment PartitionMeta{cluster='10.66.137.93:9093', > topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-26} > 2017-08-01 04:57:41,318 INFO kafka.AbstractKafkaPartitioner > (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1 > with assignment PartitionMeta{cluster='10.66.137.93:9093', > topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-28} > 2017-08-01 04:57:41,322 INFO kafka.AbstractKafkaPartitioner > (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2 > with assignment PartitionMeta{cluster='10.66.137.93:9093', > topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster=' > 10.66.137.93:9093', > topicPartition=firefly-apps-superapp-12} > 2017-08-01 04:57:41,365 INFO zkclient.ZkEventThread > (ZkEventThread.java:run(64)) - Starting ZkClient event thread. > 2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper > (Environment.java:logEnv(100)) - Client > environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT > 2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper > (Environment.java:logEnv(100)) - Client > environment:host.name=brdn1351.target.com > 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper > (Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73 > 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper > (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle > Corporation > 2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper > (Environment.java:logEnv(100)) - Client > environment:java.home=/usr/java/jdk1.8.0_73/jre > connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181, > 10.66.137.97:2181,10.66.137.98:2181 > sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553 > 2017-08-01 04:57:41,388 INFO zkclient.ZkClient > (ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state > SyncConnected > 2017-08-01 04:57:41,392 INFO zookeeper.ClientCnxn > (ClientCnxn.java:logStartConnect(975)) - Opening socket connection to > server > 10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using > SASL > (unknown error) > 2017-08-01 04:57:41,393 INFO zookeeper.ClientCnxn > (ClientCnxn.java:primeConnection(852)) - Socket connection established to > 10.66.137.97/10.66.137.97:2181, initiating session > 2017-08-01 04:57:41,445 INFO zookeeper.ClientCnxn > (ClientCnxn.java:onConnected(1235)) - Session establishment complete on > server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1, > negotiated timeout = 30000 > 2017-08-01 04:57:41,447 INFO zkclient.ZkClient > (ZkClient.java:processStateChanged(711)) - zookeeper state changed > (SyncConnected) > 2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster > (StreamingAppMaster.java:main(106)) - Exiting Application Master > java.lang.NoSuchMethodError: > kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;) > Lscala/collection/Seq; > at > com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers( > KafkaMetadataUtil.java:117) > at > com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers( > KafkaConsumer.java:139) > at > com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions( > AbstractKafkaInputOperator.java:506) > at > com.datatorrent.stram.plan.physical.PhysicalPlan. > initPartitioning(PhysicalPlan.java:752) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator( > PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.<init>( > PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.<init>( > StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.getInstance( > StreamingContainerManager.java:3023) > at > com.datatorrent.stram.StreamingAppMasterService.serviceInit( > StreamingAppMasterService.java:551) > at org.apache.hadoop.service.AbstractService.init( > AbstractService.java:163) > at > com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102) > > > > > > -- > View this message in context: http://apache-apex-users-list. > 78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex- > application-tp1797p1801.html > Sent from the Apache Apex Users list mailing list archive at Nabble.com. >