Hello, I understand that you cannot serialize Kafka Producer.
So I've tried: (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html ) - Make the class Serializable - not possible - Declare the instance only within the lambda function passed in map. via: // as suggested by the docs ```scala kafkaOut.foreachRDD(rdd => { rdd.foreachPartition(partition => { val producer = new KafkaProducer(..) partition.foreach { record => producer.send(new ProducerRecord(outputTopic, record._1, record._2) } producer.close() }) }) // foreachRDD ``` - Make the NotSerializable object as a static and create it once per machine. via: ```scala object KafkaSink { @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { if (instance == null) { synchronized { println("Creating new kafka producer") val props = new java.util.Properties() ....... instance = sc.broadcast(new KafkaProducer[String, String](props)) sys.addShutdownHook { instance.value.close() } } } instance } } ``` - Call rdd.forEachPartition and create the NotSerializable object in there like this: Same as above. - Mark the instance @transient Same thing, just make it a class variable via: ``` @transient var producer: KakfaProducer[String,String] = null def getInstance() = { if( producer == null ) { producer = new KafkaProducer() } producer } ``` However, I get serialization problems with all of these options. Thanks for your help. - Alex