Hello,

I understand that you cannot serialize Kafka Producer.

So I've tried:

(as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
)

 - Make the class Serializable - not possible

 - Declare the instance only within the lambda function passed in map.

via:

// as suggested by the docs


```scala

   kafkaOut.foreachRDD(rdd => {
     rdd.foreachPartition(partition => {
          val producer = new KafkaProducer(..)
          partition.foreach { record =>
              producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
           }
          producer.close()
       })
     }) // foreachRDD


```

- Make the NotSerializable object as a static and create it once per
machine.

via:


```scala


object KafkaSink {
  @volatile private var instance: Broadcast[KafkaProducer[String, String]]
= null
  def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
    if (instance == null) {
      synchronized {
        println("Creating new kafka producer")
        val props = new java.util.Properties()
        .......
        instance = sc.broadcast(new KafkaProducer[String, String](props))
        sys.addShutdownHook {
          instance.value.close()
        }
      }
    }
    instance
  }
}


```



 - Call rdd.forEachPartition and create the NotSerializable object in there
like this:

Same as above.


- Mark the instance @transient

Same thing, just make it a class variable via:


```
@transient var producer: KakfaProducer[String,String] = null
def getInstance() = {
   if( producer == null ) {
       producer = new KafkaProducer()
   }
   producer
}

```


However, I get serialization problems with all of these options.


Thanks for your help.

- Alex

Reply via email to