Hi All, Recently I started migrating the code from kafka08 to kafka010.
in 08 *topics * argument takes care of consuming number of partitions for each topic. def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] How to pass this configuration w.r.t kafka010 ? sample code w.r.t kafka010,I find no way or the API to set this paramater val kafkaParams = Map[String, Object]("group.id" -> groupId, "bootstrap.servers" -> bootstrapServer, "value.deserializer" -> classOf[StringDeserializer], "key.deserializer" -> classOf[StringDeserializer]) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicArr.toSet, kafkaParams)) Regards Sandeep Katta -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org