Hi All,

Recently I started migrating the code from kafka08 to kafka010.

in 08  *topics * argument takes care of consuming number of partitions for
each topic.

  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)]


How to pass this configuration w.r.t kafka010 ?

sample code w.r.t kafka010,I find no way or the API to set this paramater

 val kafkaParams = Map[String, Object]("group.id" -> groupId,
        "bootstrap.servers" -> bootstrapServer,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicArr.toSet,
kafkaParams))

Regards
Sandeep Katta



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to