Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-22 Thread Alexander Gallego
Thanks brian. This is basically what I have as well, i just posted the same gist pretty much on the first email: .foreachRDD(rdd => { rdd.foreachPartition(part => { val producer: Producer[String, String] = KafkaWriter.createProducer( brokers) part.foreach(item =>

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Bryan Jeffrey
Here is what we're doing: import java.util.Properties import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import net.liftweb.json.Extraction._ import net.liftweb.json._ import org.apache.spark.streaming.dstream.DStream class KafkaWriter(brokers: Array[String], topic: String,

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Todd Nist
Have you looked at these: http://allegro.tech/2015/08/spark-kafka-integration.html http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/ Full example here: https://github.com/mkuthan/example-spark-kafka HTH. -Todd On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Thanks Ted. KafkaWordCount (producer) does not operate on a DStream[T] ```scala object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer " + " ") System.exit(1) } val

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Ted Yu
In KafkaWordCount , the String is sent back and producer.send() is called. I guess if you don't find via solution in your current design, you can consider the above. On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego wrote: > Hello, > > I understand that you cannot

Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Hello, I understand that you cannot serialize Kafka Producer. So I've tried: (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html ) - Make the class Serializable - not possible - Declare the instance only within the lambda