In KafkaWordCount , the String is sent back and producer.send() is called.

I guess if you don't find via solution in your current design, you can
consider the above.

On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <>

> Hello,
> I understand that you cannot serialize Kafka Producer.
> So I've tried:
> (as suggested here
> )
>  - Make the class Serializable - not possible
>  - Declare the instance only within the lambda function passed in map.
> via:
> // as suggested by the docs
> ```scala
>    kafkaOut.foreachRDD(rdd => {
>      rdd.foreachPartition(partition => {
>           val producer = new KafkaProducer(..)
>           partition.foreach { record =>
>               producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
>            }
>           producer.close()
>        })
>      }) // foreachRDD
> ```
> - Make the NotSerializable object as a static and create it once per
> machine.
> via:
> ```scala
> object KafkaSink {
>   @volatile private var instance: Broadcast[KafkaProducer[String, String]]
> = null
>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
>     if (instance == null) {
>       synchronized {
>         println("Creating new kafka producer")
>         val props = new java.util.Properties()
>         .......
>         instance = sc.broadcast(new KafkaProducer[String, String](props))
>         sys.addShutdownHook {
>           instance.value.close()
>         }
>       }
>     }
>     instance
>   }
> }
> ```
>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
> Same as above.
> - Mark the instance @transient
> Same thing, just make it a class variable via:
> ```
> @transient var producer: KakfaProducer[String,String] = null
> def getInstance() = {
>    if( producer == null ) {
>        producer = new KafkaProducer()
>    }
>    producer
> }
> ```
> However, I get serialization problems with all of these options.
> Thanks for your help.
> - Alex

