Thanks Ted.

 KafkaWordCount (producer) does not operate on a DStream[T]

```scala


object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer
<metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}

```


Also, doing:


```
object KafkaSink {
     def send(brokers: String, sc: SparkContext, topic: String, key:
String, value: String) =
            getInstance(brokers, sc).value.send(new ProducerRecord(topic,
key, value))
}

KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)

```


Doesn't work either, the result is:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable


Thanks!



On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> In KafkaWordCount , the String is sent back and producer.send() is called.
>
> I guess if you don't find via solution in your current design, you can
consider the above.
>
> On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io>
wrote:
>>
>> Hello,
>>
>> I understand that you cannot serialize Kafka Producer.
>>
>> So I've tried:
>>
>> (as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)

>>
>>  - Make the class Serializable - not possible
>>
>>  - Declare the instance only within the lambda function passed in map.
>>
>> via:
>>
>> // as suggested by the docs
>>
>>
>> ```scala
>>
>>    kafkaOut.foreachRDD(rdd => {
>>      rdd.foreachPartition(partition => {
>>           val producer = new KafkaProducer(..)
>>           partition.foreach { record =>
>>               producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
>>            }
>>           producer.close()
>>        })
>>      }) // foreachRDD
>>
>>
>> ```
>>
>> - Make the NotSerializable object as a static and create it once per
machine.
>>
>> via:
>>
>>
>> ```scala
>>
>>
>> object KafkaSink {
>>   @volatile private var instance: Broadcast[KafkaProducer[String,
String]] = null
>>   def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
>>     if (instance == null) {
>>       synchronized {
>>         println("Creating new kafka producer")
>>         val props = new java.util.Properties()
>>         .......
>>         instance = sc.broadcast(new KafkaProducer[String, String](props))
>>         sys.addShutdownHook {
>>           instance.value.close()
>>         }
>>       }
>>     }
>>     instance
>>   }
>> }
>>
>>
>> ```
>>
>>
>>
>>  - Call rdd.forEachPartition and create the NotSerializable object in
there like this:
>>
>> Same as above.
>>
>>
>> - Mark the instance @transient
>>
>> Same thing, just make it a class variable via:
>>
>>
>> ```
>> @transient var producer: KakfaProducer[String,String] = null
>> def getInstance() = {
>>    if( producer == null ) {
>>        producer = new KafkaProducer()
>>    }
>>    producer
>> }
>>
>> ```
>>
>>
>> However, I get serialization problems with all of these options.
>>
>>
>> Thanks for your help.
>>
>> - Alex
>>
>



--





Alexander Gallego
Co-Founder & CTO

Reply via email to