Hi TD / Michael,
I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF Type mismatch, expected: foreachWriter[Row], actual: KafkaSink Cannot resolve reference foreach with such signature Below is the snippet val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .flatMap(d => { var events = AvroHelper.readEvents(d) events.map((event: HdfsEvent) => { var payload = EventPayloadParser.read(event.getPayload) new KafkaMessage(payload) }) }) case class KafkaMessage( payload: String) This is where I use the foreach val writer = new KafkaSink("kafka-topic", KafkaBroker) val query = data.writeStream.foreach(writer).outputMode("update").start() In this case, it shows – Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink Cannot resolve reference foreach with such signature Any help is much appreciated. Thank you. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, January 13, 2017 3:31 PM To: Koert Kuipers <ko...@tresata.com> Cc: Peyman Mohajerian <mohaj...@gmail.com>; Senthil Kumar <senthilec...@gmail.com>; User <user@spark.apache.org>; senthilec...@apache.org Subject: Re: Spark SQL DataFrame to Kafka Topic Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com<mailto:ko...@tresata.com>> wrote: how do you do this with structured streaming? i see no mention of writing to kafka On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mohaj...@gmail.com<mailto:mohaj...@gmail.com>> wrote: Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <senthilec...@gmail.com<mailto:senthilec...@gmail.com>> wrote: Hi Team , Sorry if this question already asked in this forum.. Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? Here is my Code which Reads Parquet File : val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sqlContext.read.parquet("..../temp/*.parquet") df.registerTempTable("beacons") I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ?? Cheers, Senthil