Thanks brian.

This is basically what I have as well, i just posted the same gist pretty
much on the first email:

      .foreachRDD(rdd => {
      rdd.foreachPartition(part => {
        val producer: Producer[String, String] = KafkaWriter.createProducer(
brokers)
        part.foreach(item => producer.send(item))
        producer.close()
      })


I had a bug w/ implicits from spark. Not really sure why, but I had built a
spark context on a different configuration file and I'm not sure what was
passing the wrong spark context.

Effectively, everything worked when I tested merging my funcs into one file.

Thanks again.

On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Here is what we're doing:
>
>
> import java.util.Properties
>
> import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
> import net.liftweb.json.Extraction._
> import net.liftweb.json._
> import org.apache.spark.streaming.dstream.DStream
>
> class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
> Int) {
>   def write[T](data: DStream[T]): Unit = {
>     KafkaWriter.write(data, topic, brokers, numPartitions)
>   }
> }
>
> object KafkaWriter {
>   def write[T](data: DStream[T], topic: String, brokers: Array[String],
> numPartitions: Int): Unit = {
>     val dataToWrite =
>       if (numPartitions > 0) {
>         data.repartition(numPartitions)
>       } else {
>         data
>       }
>
>     dataToWrite
>       .map(x => new KeyedMessage[String, String](topic,
> KafkaWriter.toJson(x)))
>       .foreachRDD(rdd => {
>       rdd.foreachPartition(part => {
>         val producer: Producer[String, String] =
> KafkaWriter.createProducer(brokers)
>         part.foreach(item => producer.send(item))
>         producer.close()
>       })
>     })
>   }
>
>   def apply(brokers: Option[Array[String]], topic: String, numPartitions:
> Int): KafkaWriter = {
>     val brokersToUse =
>       brokers match {
>         case Some(x) => x
>         case None => throw new IllegalArgumentException("Must specify
> brokers!")
>       }
>
>     new KafkaWriter(brokersToUse, topic, numPartitions)
>   }
>
>   def toJson[T](data: T): String = {
>     implicit val formats = DefaultFormats ++
> net.liftweb.json.ext.JodaTimeSerializers.all
>     compactRender(decompose(data))
>   }
>
>   def createProducer(brokers: Array[String]): Producer[String, String] = {
>     val properties = new Properties()
>     properties.put("metadata.broker.list", brokers.mkString(","))
>     properties.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val kafkaConfig = new ProducerConfig(properties)
>     new Producer[String, String](kafkaConfig)
>   }
> }
>
>
> Then just call:
>
>     val kafkaWriter: KafkaWriter =
> KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
> config.getString(Parameters.topicName), numPartitions =
> kafkaWritePartitions)
>     detectionWriter.write(dataToWriteToKafka)
>
>
> Hope that helps!
>
> Bryan Jeffrey
>
> On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io>
> wrote:
>
>> Thanks Ted.
>>
>>  KafkaWordCount (producer) does not operate on a DStream[T]
>>
>> ```scala
>>
>>
>> object KafkaWordCountProducer {
>>
>>   def main(args: Array[String]) {
>>     if (args.length < 4) {
>>       System.err.println("Usage: KafkaWordCountProducer
>> <metadataBrokerList> <topic> " +
>>         "<messagesPerSec> <wordsPerMessage>")
>>       System.exit(1)
>>     }
>>
>>     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>>
>>     // Zookeeper connection properties
>>     val props = new HashMap[String, Object]()
>>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>       "org.apache.kafka.common.serialization.StringSerializer")
>>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>       "org.apache.kafka.common.serialization.StringSerializer")
>>
>>     val producer = new KafkaProducer[String, String](props)
>>
>>     // Send some messages
>>     while(true) {
>>       (1 to messagesPerSec.toInt).foreach { messageNum =>
>>         val str = (1 to wordsPerMessage.toInt).map(x =>
>> scala.util.Random.nextInt(10).toString)
>>           .mkString(" ")
>>
>>         val message = new ProducerRecord[String, String](topic, null, str)
>>         producer.send(message)
>>       }
>>
>>       Thread.sleep(1000)
>>     }
>>   }
>>
>> }
>>
>> ```
>>
>>
>> Also, doing:
>>
>>
>> ```
>> object KafkaSink {
>>      def send(brokers: String, sc: SparkContext, topic: String, key:
>> String, value: String) =
>>             getInstance(brokers, sc).value.send(new ProducerRecord(topic,
>> key, value))
>> }
>>
>> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>>
>> ```
>>
>>
>> Doesn't work either, the result is:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>>
>>
>> Thanks!
>>
>>
>>
>>
>> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>> >
>> > In KafkaWordCount , the String is sent back and producer.send() is
>> called.
>> >
>> > I guess if you don't find via solution in your current design, you can
>> consider the above.
>> >
>> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <
>> agall...@concord.io> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I understand that you cannot serialize Kafka Producer.
>> >>
>> >> So I've tried:
>> >>
>> >> (as suggested here
>> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>>
>> >>
>> >>  - Make the class Serializable - not possible
>> >>
>> >>  - Declare the instance only within the lambda function passed in map.
>> >>
>> >> via:
>> >>
>> >> // as suggested by the docs
>> >>
>> >>
>> >> ```scala
>> >>
>> >>    kafkaOut.foreachRDD(rdd => {
>> >>      rdd.foreachPartition(partition => {
>> >>           val producer = new KafkaProducer(..)
>> >>           partition.foreach { record =>
>> >>               producer.send(new ProducerRecord(outputTopic, record._1,
>> record._2)
>> >>            }
>> >>           producer.close()
>> >>        })
>> >>      }) // foreachRDD
>> >>
>> >>
>> >> ```
>> >>
>> >> - Make the NotSerializable object as a static and create it once per
>> machine.
>> >>
>> >> via:
>> >>
>> >>
>> >> ```scala
>> >>
>> >>
>> >> object KafkaSink {
>> >>   @volatile private var instance: Broadcast[KafkaProducer[String,
>> String]] = null
>> >>   def getInstance(brokers: String, sc: SparkContext):
>> Broadcast[KafkaProducer[String, String]] = {
>> >>     if (instance == null) {
>> >>       synchronized {
>> >>         println("Creating new kafka producer")
>> >>         val props = new java.util.Properties()
>> >>         .......
>> >>         instance = sc.broadcast(new KafkaProducer[String,
>> String](props))
>> >>         sys.addShutdownHook {
>> >>           instance.value.close()
>> >>         }
>> >>       }
>> >>     }
>> >>     instance
>> >>   }
>> >> }
>> >>
>> >>
>> >> ```
>> >>
>> >>
>> >>
>> >>  - Call rdd.forEachPartition and create the NotSerializable object in
>> there like this:
>> >>
>> >> Same as above.
>> >>
>> >>
>> >> - Mark the instance @transient
>> >>
>> >> Same thing, just make it a class variable via:
>> >>
>> >>
>> >> ```
>> >> @transient var producer: KakfaProducer[String,String] = null
>> >> def getInstance() = {
>> >>    if( producer == null ) {
>> >>        producer = new KafkaProducer()
>> >>    }
>> >>    producer
>> >> }
>> >>
>> >> ```
>> >>
>> >>
>> >> However, I get serialization problems with all of these options.
>> >>
>> >>
>> >> Thanks for your help.
>> >>
>> >> - Alex
>> >>
>> >
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> Alexander Gallego
>> Co-Founder & CTO
>>
>
>


-- 





Alexander Gallego
Co-Founder & CTO

Reply via email to