RE: Spark SQL DataFrame to Kafka Topic
Thanks Michael, that worked, appreciate your help. From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Monday, May 15, 2017 11:45 AM To: Revin Chalil Cc: User Subject: Re: Spark SQL DataFrame to Kafka Topic The foreach sink from that blog post requires that you have a DataFrame with two columns in the form of a Tuple2, (String, String), where as your dataframe has only a single column `payload`. You could change the KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work. I'd also suggest you just try the native KafkaSink<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html> that is part of Spark 2.2<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>. On Sun, May 14, 2017 at 9:31 AM, Revin Chalil mailto:rcha...@expedia.com>> wrote: Hi TD / Michael, I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF Type mismatch, expected: foreachWriter[Row], actual: KafkaSink Cannot resolve reference foreach with such signature Below is the snippet val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .flatMap(d => { var events = AvroHelper.readEvents(d) events.map((event: HdfsEvent) => { var payload = EventPayloadParser.read(event.getPayload) new KafkaMessage(payload) }) }) case class KafkaMessage( payload: String) This is where I use the foreach val writer = new KafkaSink("kafka-topic", KafkaBroker) val query = data.writeStream.foreach(writer).outputMode("update").start() In this case, it shows – Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink Cannot resolve reference foreach with such signature Any help is much appreciated. Thank you. From: Tathagata Das [mailto:tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>] Sent: Friday, January 13, 2017 3:31 PM To: Koert Kuipers mailto:ko...@tresata.com>> Cc: Peyman Mohajerian mailto:mohaj...@gmail.com>>; Senthil Kumar mailto:senthilec...@gmail.com>>; User mailto:user@spark.apache.org>>; senthilec...@apache.org<mailto:senthilec...@apache.org> Subject: Re: Spark SQL DataFrame to Kafka Topic Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers mailto:ko...@tresata.com>> wrote: how do you do this with structured streaming? i see no mention of writing to kafka On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian mailto:mohaj...@gmail.com>> wrote: Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar mailto:senthilec...@gmail.com>> wrote: Hi Team , Sorry if this question already asked in this forum.. Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? Here is my Code which Reads Parquet File : val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sqlContext.read.parquet("/temp/*.parquet") df.registerTempTable("beacons") I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ?? Cheers, Senthil
Re: Spark SQL DataFrame to Kafka Topic
The foreach sink from that blog post requires that you have a DataFrame with two columns in the form of a Tuple2, (String, String), where as your dataframe has only a single column `payload`. You could change the KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work. I'd also suggest you just try the native KafkaSink <https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html> that is part of Spark 2.2 <http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html> . On Sun, May 14, 2017 at 9:31 AM, Revin Chalil wrote: > Hi TD / Michael, > > > > I am trying to use the foreach sink to write to Kafka and followed this > <https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> > from DBricks blog by Sunil Sitaula > <https://databricks.com/blog/author/sunil-sitaula> . I get the below with > DF.writeStream.foreach(writer).outputMode("update").start() when using a > simple DF > > Type mismatch, expected: foreachWriter[Row], actual: KafkaSink > > Cannot resolve reference foreach with such signature > > > > Below is the snippet > > *val *data = session > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", KafkaBroker) > .option("subscribe", InTopic) > .load() > .select($"value".as[Array[Byte]]) > .flatMap(d => { > *var *events = AvroHelper.*readEvents*(d) > events.map((event: HdfsEvent) => { > *var *payload = EventPayloadParser.*read*(event.getPayload) > *new *KafkaMessage(payload) > }) > }) > > > > *case class *KafkaMessage( > payload: String) > > > > This is where I use the foreach > > *val *writer = *new *KafkaSink("kafka-topic", KafkaBroker) > *val *query = data.writeStream.foreach(writer).outputMode("update").start() > > > > In this case, it shows – > > Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: > Main.KafkaSink > > Cannot resolve reference foreach with such signature > > > > Any help is much appreciated. Thank you. > > > > > > *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] > *Sent:* Friday, January 13, 2017 3:31 PM > *To:* Koert Kuipers > *Cc:* Peyman Mohajerian ; Senthil Kumar < > senthilec...@gmail.com>; User ; > senthilec...@apache.org > *Subject:* Re: Spark SQL DataFrame to Kafka Topic > > > > Structured Streaming has a foreach sink, where you can essentially do what > you want with your data. Its easy to create a Kafka producer, and write the > data out to kafka. > > http://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html#using-foreach > > > > On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers wrote: > > how do you do this with structured streaming? i see no mention of writing > to kafka > > > > On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian > wrote: > > Yes, it is called Structured Streaming: https://docs. > databricks.com/_static/notebooks/structured-streaming-kafka.html > > http://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html > > > > On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar > wrote: > > Hi Team , > > > > Sorry if this question already asked in this forum.. > > > > Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? > > > > Here is my Code which Reads Parquet File : > > > > *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* > > *val df = sqlContext.read.parquet("/temp/*.parquet")* > > *df.registerTempTable("beacons")* > > > > I want to directly ingest df DataFrame to Kafka ! Is there any way to > achieve this ?? > > > > Cheers, > > Senthil > > > > > > >
RE: Spark SQL DataFrame to Kafka Topic
I couldn’t get this working yet.. If anyone has successfully used forEach Sink for kafka with structured streaming, plz share... Thanks. From: Revin Chalil [mailto:rcha...@expedia.com] Sent: Sunday, May 14, 2017 9:32 AM To: Tathagata Das ; mich...@databricks.com Cc: Peyman Mohajerian ; Senthil Kumar ; User ; senthilec...@apache.org; Ofir Manor ; Hemanth Gudela ; lucas.g...@gmail.com; Koert Kuipers ; silvio.fior...@granturing.com Subject: RE: Spark SQL DataFrame to Kafka Topic Hi TD / Michael, I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF Type mismatch, expected: foreachWriter[Row], actual: KafkaSink Cannot resolve reference foreach with such signature Below is the snippet val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .flatMap(d => { var events = AvroHelper.readEvents(d) events.map((event: HdfsEvent) => { var payload = EventPayloadParser.read(event.getPayload) new KafkaMessage(payload) }) }) case class KafkaMessage( payload: String) This is where I use the foreach val writer = new KafkaSink("kafka-topic", KafkaBroker) val query = data.writeStream.foreach(writer).outputMode("update").start() In this case, it shows – Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink Cannot resolve reference foreach with such signature Any help is much appreciated. Thank you. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, January 13, 2017 3:31 PM To: Koert Kuipers mailto:ko...@tresata.com>> Cc: Peyman Mohajerian mailto:mohaj...@gmail.com>>; Senthil Kumar mailto:senthilec...@gmail.com>>; User mailto:user@spark.apache.org>>; senthilec...@apache.org<mailto:senthilec...@apache.org> Subject: Re: Spark SQL DataFrame to Kafka Topic Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers mailto:ko...@tresata.com>> wrote: how do you do this with structured streaming? i see no mention of writing to kafka On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian mailto:mohaj...@gmail.com>> wrote: Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar mailto:senthilec...@gmail.com>> wrote: Hi Team , Sorry if this question already asked in this forum.. Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? Here is my Code which Reads Parquet File : val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sqlContext.read.parquet("/temp/*.parquet") df.registerTempTable("beacons") I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ?? Cheers, Senthil
RE: Spark SQL DataFrame to Kafka Topic
Hi TD / Michael, I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF Type mismatch, expected: foreachWriter[Row], actual: KafkaSink Cannot resolve reference foreach with such signature Below is the snippet val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .flatMap(d => { var events = AvroHelper.readEvents(d) events.map((event: HdfsEvent) => { var payload = EventPayloadParser.read(event.getPayload) new KafkaMessage(payload) }) }) case class KafkaMessage( payload: String) This is where I use the foreach val writer = new KafkaSink("kafka-topic", KafkaBroker) val query = data.writeStream.foreach(writer).outputMode("update").start() In this case, it shows – Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink Cannot resolve reference foreach with such signature Any help is much appreciated. Thank you. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, January 13, 2017 3:31 PM To: Koert Kuipers Cc: Peyman Mohajerian ; Senthil Kumar ; User ; senthilec...@apache.org Subject: Re: Spark SQL DataFrame to Kafka Topic Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers mailto:ko...@tresata.com>> wrote: how do you do this with structured streaming? i see no mention of writing to kafka On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian mailto:mohaj...@gmail.com>> wrote: Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar mailto:senthilec...@gmail.com>> wrote: Hi Team , Sorry if this question already asked in this forum.. Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? Here is my Code which Reads Parquet File : val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sqlContext.read.parquet("/temp/*.parquet") df.registerTempTable("beacons") I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ?? Cheers, Senthil
Re: Spark SQL DataFrame to Kafka Topic
Is there a plan to have this in pyspark in dome later release? On Wed, 25 Jan 2017 at 10:01 am, Koert Kuipers wrote: > i implemented a sink using foreach it was indeed straightforward thanks > > On Fri, Jan 13, 2017 at 6:30 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > Structured Streaming has a foreach sink, where you can essentially do what > you want with your data. Its easy to create a Kafka producer, and write the > data out to kafka. > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach > > On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers wrote: > > how do you do this with structured streaming? i see no mention of writing > to kafka > > On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian > wrote: > > Yes, it is called Structured Streaming: > https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html > > On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar > wrote: > > Hi Team , > > Sorry if this question already asked in this forum.. > > Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? > > Here is my Code which Reads Parquet File : > > *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* > > *val df = sqlContext.read.parquet("/temp/*.parquet")* > > *df.registerTempTable("beacons")* > > > I want to directly ingest df DataFrame to Kafka ! Is there any way to > achieve this ?? > > > Cheers, > > Senthil > > > > > > -- Best Regards, Ayan Guha
Re: Spark SQL DataFrame to Kafka Topic
i implemented a sink using foreach it was indeed straightforward thanks On Fri, Jan 13, 2017 at 6:30 PM, Tathagata Das wrote: > Structured Streaming has a foreach sink, where you can essentially do what > you want with your data. Its easy to create a Kafka producer, and write the > data out to kafka. > http://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html#using-foreach > > On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers wrote: > >> how do you do this with structured streaming? i see no mention of writing >> to kafka >> >> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian >> wrote: >> >>> Yes, it is called Structured Streaming: https://docs.databr >>> icks.com/_static/notebooks/structured-streaming-kafka.html >>> http://spark.apache.org/docs/latest/structured-streaming-pro >>> gramming-guide.html >>> >>> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar >>> wrote: >>> Hi Team , Sorry if this question already asked in this forum.. Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? Here is my Code which Reads Parquet File : *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* *val df = sqlContext.read.parquet("/temp/*.parquet")* *df.registerTempTable("beacons")* I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ?? Cheers, Senthil >>> >>> >> >
Re: Spark SQL DataFrame to Kafka Topic
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers wrote: > how do you do this with structured streaming? i see no mention of writing > to kafka > > On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian > wrote: > >> Yes, it is called Structured Streaming: https://docs.databr >> icks.com/_static/notebooks/structured-streaming-kafka.html >> http://spark.apache.org/docs/latest/structured-streaming-pro >> gramming-guide.html >> >> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar >> wrote: >> >>> Hi Team , >>> >>> Sorry if this question already asked in this forum.. >>> >>> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? >>> >>> Here is my Code which Reads Parquet File : >>> >>> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* >>> >>> *val df = sqlContext.read.parquet("/temp/*.parquet")* >>> >>> *df.registerTempTable("beacons")* >>> >>> >>> I want to directly ingest df DataFrame to Kafka ! Is there any way to >>> achieve this ?? >>> >>> >>> Cheers, >>> >>> Senthil >>> >> >> >
Re: Spark SQL DataFrame to Kafka Topic
how do you do this with structured streaming? i see no mention of writing to kafka On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian wrote: > Yes, it is called Structured Streaming: https://docs. > databricks.com/_static/notebooks/structured-streaming-kafka.html > http://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html > > On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar > wrote: > >> Hi Team , >> >> Sorry if this question already asked in this forum.. >> >> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? >> >> Here is my Code which Reads Parquet File : >> >> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* >> >> *val df = sqlContext.read.parquet("/temp/*.parquet")* >> >> *df.registerTempTable("beacons")* >> >> >> I want to directly ingest df DataFrame to Kafka ! Is there any way to >> achieve this ?? >> >> >> Cheers, >> >> Senthil >> > >
Re: Spark SQL DataFrame to Kafka Topic
Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar wrote: > Hi Team , > > Sorry if this question already asked in this forum.. > > Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ?? > > Here is my Code which Reads Parquet File : > > *val sqlContext = new org.apache.spark.sql.SQLContext(sc);* > > *val df = sqlContext.read.parquet("/temp/*.parquet")* > > *df.registerTempTable("beacons")* > > > I want to directly ingest df DataFrame to Kafka ! Is there any way to > achieve this ?? > > > Cheers, > > Senthil >