Thanks brian. This is basically what I have as well, i just posted the same gist pretty much on the first email:
.foreachRDD(rdd => { rdd.foreachPartition(part => { val producer: Producer[String, String] = KafkaWriter.createProducer( brokers) part.foreach(item => producer.send(item)) producer.close() }) I had a bug w/ implicits from spark. Not really sure why, but I had built a spark context on a different configuration file and I'm not sure what was passing the wrong spark context. Effectively, everything worked when I tested merging my funcs into one file. Thanks again. On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Here is what we're doing: > > > import java.util.Properties > > import kafka.producer.{KeyedMessage, Producer, ProducerConfig} > import net.liftweb.json.Extraction._ > import net.liftweb.json._ > import org.apache.spark.streaming.dstream.DStream > > class KafkaWriter(brokers: Array[String], topic: String, numPartitions: > Int) { > def write[T](data: DStream[T]): Unit = { > KafkaWriter.write(data, topic, brokers, numPartitions) > } > } > > object KafkaWriter { > def write[T](data: DStream[T], topic: String, brokers: Array[String], > numPartitions: Int): Unit = { > val dataToWrite = > if (numPartitions > 0) { > data.repartition(numPartitions) > } else { > data > } > > dataToWrite > .map(x => new KeyedMessage[String, String](topic, > KafkaWriter.toJson(x))) > .foreachRDD(rdd => { > rdd.foreachPartition(part => { > val producer: Producer[String, String] = > KafkaWriter.createProducer(brokers) > part.foreach(item => producer.send(item)) > producer.close() > }) > }) > } > > def apply(brokers: Option[Array[String]], topic: String, numPartitions: > Int): KafkaWriter = { > val brokersToUse = > brokers match { > case Some(x) => x > case None => throw new IllegalArgumentException("Must specify > brokers!") > } > > new KafkaWriter(brokersToUse, topic, numPartitions) > } > > def toJson[T](data: T): String = { > implicit val formats = DefaultFormats ++ > net.liftweb.json.ext.JodaTimeSerializers.all > compactRender(decompose(data)) > } > > def createProducer(brokers: Array[String]): Producer[String, String] = { > val properties = new Properties() > properties.put("metadata.broker.list", brokers.mkString(",")) > properties.put("serializer.class", "kafka.serializer.StringEncoder") > > val kafkaConfig = new ProducerConfig(properties) > new Producer[String, String](kafkaConfig) > } > } > > > Then just call: > > val kafkaWriter: KafkaWriter = > KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config), > config.getString(Parameters.topicName), numPartitions = > kafkaWritePartitions) > detectionWriter.write(dataToWriteToKafka) > > > Hope that helps! > > Bryan Jeffrey > > On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io> > wrote: > >> Thanks Ted. >> >> KafkaWordCount (producer) does not operate on a DStream[T] >> >> ```scala >> >> >> object KafkaWordCountProducer { >> >> def main(args: Array[String]) { >> if (args.length < 4) { >> System.err.println("Usage: KafkaWordCountProducer >> <metadataBrokerList> <topic> " + >> "<messagesPerSec> <wordsPerMessage>") >> System.exit(1) >> } >> >> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args >> >> // Zookeeper connection properties >> val props = new HashMap[String, Object]() >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> >> val producer = new KafkaProducer[String, String](props) >> >> // Send some messages >> while(true) { >> (1 to messagesPerSec.toInt).foreach { messageNum => >> val str = (1 to wordsPerMessage.toInt).map(x => >> scala.util.Random.nextInt(10).toString) >> .mkString(" ") >> >> val message = new ProducerRecord[String, String](topic, null, str) >> producer.send(message) >> } >> >> Thread.sleep(1000) >> } >> } >> >> } >> >> ``` >> >> >> Also, doing: >> >> >> ``` >> object KafkaSink { >> def send(brokers: String, sc: SparkContext, topic: String, key: >> String, value: String) = >> getInstance(brokers, sc).value.send(new ProducerRecord(topic, >> key, value)) >> } >> >> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) >> >> ``` >> >> >> Doesn't work either, the result is: >> >> Exception in thread "main" org.apache.spark.SparkException: Task not >> serializable >> >> >> Thanks! >> >> >> >> >> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> > >> > In KafkaWordCount , the String is sent back and producer.send() is >> called. >> > >> > I guess if you don't find via solution in your current design, you can >> consider the above. >> > >> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego < >> agall...@concord.io> wrote: >> >> >> >> Hello, >> >> >> >> I understand that you cannot serialize Kafka Producer. >> >> >> >> So I've tried: >> >> >> >> (as suggested here >> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) >> >> >> >> >> - Make the class Serializable - not possible >> >> >> >> - Declare the instance only within the lambda function passed in map. >> >> >> >> via: >> >> >> >> // as suggested by the docs >> >> >> >> >> >> ```scala >> >> >> >> kafkaOut.foreachRDD(rdd => { >> >> rdd.foreachPartition(partition => { >> >> val producer = new KafkaProducer(..) >> >> partition.foreach { record => >> >> producer.send(new ProducerRecord(outputTopic, record._1, >> record._2) >> >> } >> >> producer.close() >> >> }) >> >> }) // foreachRDD >> >> >> >> >> >> ``` >> >> >> >> - Make the NotSerializable object as a static and create it once per >> machine. >> >> >> >> via: >> >> >> >> >> >> ```scala >> >> >> >> >> >> object KafkaSink { >> >> @volatile private var instance: Broadcast[KafkaProducer[String, >> String]] = null >> >> def getInstance(brokers: String, sc: SparkContext): >> Broadcast[KafkaProducer[String, String]] = { >> >> if (instance == null) { >> >> synchronized { >> >> println("Creating new kafka producer") >> >> val props = new java.util.Properties() >> >> ....... >> >> instance = sc.broadcast(new KafkaProducer[String, >> String](props)) >> >> sys.addShutdownHook { >> >> instance.value.close() >> >> } >> >> } >> >> } >> >> instance >> >> } >> >> } >> >> >> >> >> >> ``` >> >> >> >> >> >> >> >> - Call rdd.forEachPartition and create the NotSerializable object in >> there like this: >> >> >> >> Same as above. >> >> >> >> >> >> - Mark the instance @transient >> >> >> >> Same thing, just make it a class variable via: >> >> >> >> >> >> ``` >> >> @transient var producer: KakfaProducer[String,String] = null >> >> def getInstance() = { >> >> if( producer == null ) { >> >> producer = new KafkaProducer() >> >> } >> >> producer >> >> } >> >> >> >> ``` >> >> >> >> >> >> However, I get serialization problems with all of these options. >> >> >> >> >> >> Thanks for your help. >> >> >> >> - Alex >> >> >> > >> >> >> >> -- >> >> >> >> >> >> Alexander Gallego >> Co-Founder & CTO >> > > -- Alexander Gallego Co-Founder & CTO