Thanks brian.
This is basically what I have as well, i just posted the same gist pretty
much on the first email:
.foreachRDD(rdd => {
rdd.foreachPartition(part => {
val producer: Producer[String, String] = KafkaWriter.createProducer(
brokers)
part.foreach(item =>
Here is what we're doing:
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream
class KafkaWriter(brokers: Array[String], topic: String,
Have you looked at these:
http://allegro.tech/2015/08/spark-kafka-integration.html
http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/
Full example here:
https://github.com/mkuthan/example-spark-kafka
HTH.
-Todd
On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego
Thanks Ted.
KafkaWordCount (producer) does not operate on a DStream[T]
```scala
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer
" +
" ")
System.exit(1)
}
val
In KafkaWordCount , the String is sent back and producer.send() is called.
I guess if you don't find via solution in your current design, you can
consider the above.
On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego
wrote:
> Hello,
>
> I understand that you cannot
Hello,
I understand that you cannot serialize Kafka Producer.
So I've tried:
(as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
)
- Make the class Serializable - not possible
- Declare the instance only within the lambda