Here is what we're doing:

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream

class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
Int) {
  def write[T](data: DStream[T]): Unit = {
    KafkaWriter.write(data, topic, brokers, numPartitions)
  }
}

object KafkaWriter {
  def write[T](data: DStream[T], topic: String, brokers: Array[String],
numPartitions: Int): Unit = {
    val dataToWrite =
      if (numPartitions > 0) {
        data.repartition(numPartitions)
      } else {
        data
      }

    dataToWrite
      .map(x => new KeyedMessage[String, String](topic,
KafkaWriter.toJson(x)))
      .foreachRDD(rdd => {
      rdd.foreachPartition(part => {
        val producer: Producer[String, String] =
KafkaWriter.createProducer(brokers)
        part.foreach(item => producer.send(item))
        producer.close()
      })
    })
  }

  def apply(brokers: Option[Array[String]], topic: String, numPartitions:
Int): KafkaWriter = {
    val brokersToUse =
      brokers match {
        case Some(x) => x
        case None => throw new IllegalArgumentException("Must specify
brokers!")
      }

    new KafkaWriter(brokersToUse, topic, numPartitions)
  }

  def toJson[T](data: T): String = {
    implicit val formats = DefaultFormats ++
net.liftweb.json.ext.JodaTimeSerializers.all
    compactRender(decompose(data))
  }

  def createProducer(brokers: Array[String]): Producer[String, String] = {
    val properties = new Properties()
    properties.put("metadata.broker.list", brokers.mkString(","))
    properties.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(properties)
    new Producer[String, String](kafkaConfig)
  }
}


Then just call:

    val kafkaWriter: KafkaWriter =
KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
config.getString(Parameters.topicName), numPartitions =
kafkaWritePartitions)
    detectionWriter.write(dataToWriteToKafka)


Hope that helps!

Bryan Jeffrey

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io>
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaWordCountProducer
> <metadataBrokerList> <topic> " +
>         "<messagesPerSec> <wordsPerMessage>")
>       System.exit(1)
>     }
>
>     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
>     // Zookeeper connection properties
>     val props = new HashMap[String, Object]()
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>
>     val producer = new KafkaProducer[String, String](props)
>
>     // Send some messages
>     while(true) {
>       (1 to messagesPerSec.toInt).foreach { messageNum =>
>         val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>           .mkString(" ")
>
>         val message = new ProducerRecord[String, String](topic, null, str)
>         producer.send(message)
>       }
>
>       Thread.sleep(1000)
>     }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>      def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
>             getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io>
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>
> >>
> >>  - Make the class Serializable - not possible
> >>
> >>  - Declare the instance only within the lambda function passed in map.
> >>
> >> via:
> >>
> >> // as suggested by the docs
> >>
> >>
> >> ```scala
> >>
> >>    kafkaOut.foreachRDD(rdd => {
> >>      rdd.foreachPartition(partition => {
> >>           val producer = new KafkaProducer(..)
> >>           partition.foreach { record =>
> >>               producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
> >>            }
> >>           producer.close()
> >>        })
> >>      }) // foreachRDD
> >>
> >>
> >> ```
> >>
> >> - Make the NotSerializable object as a static and create it once per
> machine.
> >>
> >> via:
> >>
> >>
> >> ```scala
> >>
> >>
> >> object KafkaSink {
> >>   @volatile private var instance: Broadcast[KafkaProducer[String,
> String]] = null
> >>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
> >>     if (instance == null) {
> >>       synchronized {
> >>         println("Creating new kafka producer")
> >>         val props = new java.util.Properties()
> >>         .......
> >>         instance = sc.broadcast(new KafkaProducer[String,
> String](props))
> >>         sys.addShutdownHook {
> >>           instance.value.close()
> >>         }
> >>       }
> >>     }
> >>     instance
> >>   }
> >> }
> >>
> >>
> >> ```
> >>
> >>
> >>
> >>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
> >>
> >> Same as above.
> >>
> >>
> >> - Mark the instance @transient
> >>
> >> Same thing, just make it a class variable via:
> >>
> >>
> >> ```
> >> @transient var producer: KakfaProducer[String,String] = null
> >> def getInstance() = {
> >>    if( producer == null ) {
> >>        producer = new KafkaProducer()
> >>    }
> >>    producer
> >> }
> >>
> >> ```
> >>
> >>
> >> However, I get serialization problems with all of these options.
> >>
> >>
> >> Thanks for your help.
> >>
> >> - Alex
> >>
> >
>
>
>
> --
>
>
>
>
>
> Alexander Gallego
> Co-Founder & CTO
>

Reply via email to