Short answer is it isn't necessary.

Long answer is that you aren't just changing from 08 to 10, you're
changing from the receiver based implementation to the direct stream.
Read these:

https://github.com/koeninger/kafka-exactly-once
http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

On Thu, Aug 2, 2018 at 2:02 AM, sandeep_katta
<sandeep0102.opensou...@gmail.com> wrote:
> Hi All,
>
> Recently I started migrating the code from kafka08 to kafka010.
>
> in 08  *topics * argument takes care of consuming number of partitions for
> each topic.
>
>   def createStream(
>       ssc: StreamingContext,
>       zkQuorum: String,
>       groupId: String,
>       topics: Map[String, Int],
>       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
>     ): ReceiverInputDStream[(String, String)]
>
>
> How to pass this configuration w.r.t kafka010 ?
>
> sample code w.r.t kafka010,I find no way or the API to set this paramater
>
>  val kafkaParams = Map[String, Object]("group.id" -> groupId,
>         "bootstrap.servers" -> bootstrapServer,
>         "value.deserializer" -> classOf[StringDeserializer],
>         "key.deserializer" -> classOf[StringDeserializer])
> val messages = KafkaUtils.createDirectStream[String, String](
>         ssc,
>         LocationStrategies.PreferConsistent,
>         ConsumerStrategies.Subscribe[String, String](topicArr.toSet,
> kafkaParams))
>
> Regards
> Sandeep Katta
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to