Hi experts!

I have a schemaRDD of messages to be pushed in kafka. So I am using
following piece of code to do that

rdd.foreachPartition(itr => {
                                val props = new Properties()
                                props.put("metadata.broker.list", brokersList)
                                props.put("serializer.class", 
"kafka.serializer.StringEncoder")
                                props.put("compression.codec", codec.toString)
                                props.put("producer.type", "sync")
                                props.put("batch.num.messages", 
BatchSize.toString)
                                props.put("message.send.max.retries", 
maxRetries.toString)
                                props.put("request.required.acks", "-1")
                                producer = new Producer[String, String](new 
ProducerConfig(props))
                                itr.foreach(row => {
                                        val msg = 
row.toString.drop(1).dropRight(1)
                                        this.synchronized {
                                                producer.send(new 
KeyedMessage[String, String](Topic, msg))
                                        }
                                })
                                producer.close
                        })



the problem with this code is that it creates kafka producer separate for
each partition and I want a single producer for all partitions. Is there any
way to achieve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to