Thanks Ted. KafkaWordCount (producer) does not operate on a DStream[T]
```scala object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) } Thread.sleep(1000) } } } ``` Also, doing: ``` object KafkaSink { def send(brokers: String, sc: SparkContext, topic: String, key: String, value: String) = getInstance(brokers, sc).value.send(new ProducerRecord(topic, key, value)) } KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) ``` Doesn't work either, the result is: Exception in thread "main" org.apache.spark.SparkException: Task not serializable Thanks! On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > In KafkaWordCount , the String is sent back and producer.send() is called. > > I guess if you don't find via solution in your current design, you can consider the above. > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io> wrote: >> >> Hello, >> >> I understand that you cannot serialize Kafka Producer. >> >> So I've tried: >> >> (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) >> >> - Make the class Serializable - not possible >> >> - Declare the instance only within the lambda function passed in map. >> >> via: >> >> // as suggested by the docs >> >> >> ```scala >> >> kafkaOut.foreachRDD(rdd => { >> rdd.foreachPartition(partition => { >> val producer = new KafkaProducer(..) >> partition.foreach { record => >> producer.send(new ProducerRecord(outputTopic, record._1, record._2) >> } >> producer.close() >> }) >> }) // foreachRDD >> >> >> ``` >> >> - Make the NotSerializable object as a static and create it once per machine. >> >> via: >> >> >> ```scala >> >> >> object KafkaSink { >> @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null >> def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { >> if (instance == null) { >> synchronized { >> println("Creating new kafka producer") >> val props = new java.util.Properties() >> ....... >> instance = sc.broadcast(new KafkaProducer[String, String](props)) >> sys.addShutdownHook { >> instance.value.close() >> } >> } >> } >> instance >> } >> } >> >> >> ``` >> >> >> >> - Call rdd.forEachPartition and create the NotSerializable object in there like this: >> >> Same as above. >> >> >> - Mark the instance @transient >> >> Same thing, just make it a class variable via: >> >> >> ``` >> @transient var producer: KakfaProducer[String,String] = null >> def getInstance() = { >> if( producer == null ) { >> producer = new KafkaProducer() >> } >> producer >> } >> >> ``` >> >> >> However, I get serialization problems with all of these options. >> >> >> Thanks for your help. >> >> - Alex >> > -- Alexander Gallego Co-Founder & CTO