In KafkaWordCount , the String is sent back and producer.send() is called. I guess if you don't find via solution in your current design, you can consider the above.
On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io> wrote: > Hello, > > I understand that you cannot serialize Kafka Producer. > > So I've tried: > > (as suggested here > https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html > ) > > - Make the class Serializable - not possible > > - Declare the instance only within the lambda function passed in map. > > via: > > // as suggested by the docs > > > ```scala > > kafkaOut.foreachRDD(rdd => { > rdd.foreachPartition(partition => { > val producer = new KafkaProducer(..) > partition.foreach { record => > producer.send(new ProducerRecord(outputTopic, record._1, > record._2) > } > producer.close() > }) > }) // foreachRDD > > > ``` > > - Make the NotSerializable object as a static and create it once per > machine. > > via: > > > ```scala > > > object KafkaSink { > @volatile private var instance: Broadcast[KafkaProducer[String, String]] > = null > def getInstance(brokers: String, sc: SparkContext): > Broadcast[KafkaProducer[String, String]] = { > if (instance == null) { > synchronized { > println("Creating new kafka producer") > val props = new java.util.Properties() > ....... > instance = sc.broadcast(new KafkaProducer[String, String](props)) > sys.addShutdownHook { > instance.value.close() > } > } > } > instance > } > } > > > ``` > > > > - Call rdd.forEachPartition and create the NotSerializable object in > there like this: > > Same as above. > > > - Mark the instance @transient > > Same thing, just make it a class variable via: > > > ``` > @transient var producer: KakfaProducer[String,String] = null > def getInstance() = { > if( producer == null ) { > producer = new KafkaProducer() > } > producer > } > > ``` > > > However, I get serialization problems with all of these options. > > > Thanks for your help. > > - Alex > >