In KafkaWordCount , the String is sent back and producer.send() is called.

I guess if you don't find via solution in your current design, you can
consider the above.

On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io>
wrote:

> Hello,
>
> I understand that you cannot serialize Kafka Producer.
>
> So I've tried:
>
> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
> )
>
>  - Make the class Serializable - not possible
>
>  - Declare the instance only within the lambda function passed in map.
>
> via:
>
> // as suggested by the docs
>
>
> ```scala
>
>    kafkaOut.foreachRDD(rdd => {
>      rdd.foreachPartition(partition => {
>           val producer = new KafkaProducer(..)
>           partition.foreach { record =>
>               producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
>            }
>           producer.close()
>        })
>      }) // foreachRDD
>
>
> ```
>
> - Make the NotSerializable object as a static and create it once per
> machine.
>
> via:
>
>
> ```scala
>
>
> object KafkaSink {
>   @volatile private var instance: Broadcast[KafkaProducer[String, String]]
> = null
>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
>     if (instance == null) {
>       synchronized {
>         println("Creating new kafka producer")
>         val props = new java.util.Properties()
>         .......
>         instance = sc.broadcast(new KafkaProducer[String, String](props))
>         sys.addShutdownHook {
>           instance.value.close()
>         }
>       }
>     }
>     instance
>   }
> }
>
>
> ```
>
>
>
>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
>
> Same as above.
>
>
> - Mark the instance @transient
>
> Same thing, just make it a class variable via:
>
>
> ```
> @transient var producer: KakfaProducer[String,String] = null
> def getInstance() = {
>    if( producer == null ) {
>        producer = new KafkaProducer()
>    }
>    producer
> }
>
> ```
>
>
> However, I get serialization problems with all of these options.
>
>
> Thanks for your help.
>
> - Alex
>
>

Reply via email to