I couldn’t get this working yet.. If anyone has successfully used forEach Sink 
for kafka with structured streaming, plz share... Thanks.

From: Revin Chalil [mailto:rcha...@expedia.com]
Sent: Sunday, May 14, 2017 9:32 AM
To: Tathagata Das <tathagata.das1...@gmail.com>; mich...@databricks.com
Cc: Peyman Mohajerian <mohaj...@gmail.com>; Senthil Kumar 
<senthilec...@gmail.com>; User <user@spark.apache.org>; 
senthilec...@apache.org; Ofir Manor <ofir.ma...@equalum.io>; Hemanth Gudela 
<hemanth.gud...@qvantel.com>; lucas.g...@gmail.com; Koert Kuipers 
<ko...@tresata.com>; silvio.fior...@granturing.com
Subject: RE: Spark SQL DataFrame to Kafka Topic

Hi TD / Michael,


I am trying to use the foreach sink to write to Kafka and followed 
this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>
 from DBricks blog by Sunil 
Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature



Below is the snippet

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .flatMap(d => {
    var events = AvroHelper.readEvents(d)
    events.map((event: HdfsEvent) => {
      var payload = EventPayloadParser.read(event.getPayload)
      new KafkaMessage(payload)
    })
  })



case class KafkaMessage(
  payload: String)



This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()



In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature



Any help is much appreciated. Thank you.


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers <ko...@tresata.com<mailto:ko...@tresata.com>>
Cc: Peyman Mohajerian <mohaj...@gmail.com<mailto:mohaj...@gmail.com>>; Senthil 
Kumar <senthilec...@gmail.com<mailto:senthilec...@gmail.com>>; User 
<user@spark.apache.org<mailto:user@spark.apache.org>>; 
senthilec...@apache.org<mailto:senthilec...@apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
<ko...@tresata.com<mailto:ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to 
kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
<mohaj...@gmail.com<mailto:mohaj...@gmail.com>> wrote:
Yes, it is called Structured Streaming: 
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
<senthilec...@gmail.com<mailto:senthilec...@gmail.com>> wrote:
Hi Team ,

     Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :


val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df = sqlContext.read.parquet("..../temp/*.parquet")

df.registerTempTable("beacons")



I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Cheers,

Senthil



Reply via email to