RE: Spark SQL DataFrame to Kafka Topic

2017-05-16 Thread Revin Chalil
Thanks Michael, that worked, appreciate your help.

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, May 15, 2017 11:45 AM
To: Revin Chalil 
Cc: User 
Subject: Re: Spark SQL DataFrame to Kafka Topic

The foreach sink from that blog post requires that you have a DataFrame with 
two columns in the form of a Tuple2, (String, String), where as your dataframe 
has only a single column `payload`.  You could change the KafkaSink to extend 
ForeachWriter[KafkaMessage] and then it would work.

I'd also suggest you just try the native 
KafkaSink<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>
 that is part of Spark 
2.2<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>.

On Sun, May 14, 2017 at 9:31 AM, Revin Chalil 
mailto:rcha...@expedia.com>> wrote:
Hi TD / Michael,


I am trying to use the foreach sink to write to Kafka and followed 
this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>
 from DBricks blog by Sunil 
Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature



Below is the snippet

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
  var payload = EventPayloadParser.read(event.getPayload)
  new KafkaMessage(payload)
})
  })



case class KafkaMessage(
  payload: String)



This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()



In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature



Any help is much appreciated. Thank you.


From: Tathagata Das 
[mailto:tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers mailto:ko...@tresata.com>>
Cc: Peyman Mohajerian mailto:mohaj...@gmail.com>>; Senthil 
Kumar mailto:senthilec...@gmail.com>>; User 
mailto:user@spark.apache.org>>; 
senthilec...@apache.org<mailto:senthilec...@apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
mailto:ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to 
kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
mailto:mohaj...@gmail.com>> wrote:
Yes, it is called Structured Streaming: 
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
mailto:senthilec...@gmail.com>> wrote:
Hi Team ,

 Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :


val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df = sqlContext.read.parquet("/temp/*.parquet")

df.registerTempTable("beacons")



I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Cheers,

Senthil






Re: Spark SQL DataFrame to Kafka Topic

2017-05-15 Thread Michael Armbrust
The foreach sink from that blog post requires that you have a DataFrame
with two columns in the form of a Tuple2, (String, String), where as your
dataframe has only a single column `payload`.  You could change the
KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work.

I'd also suggest you just try the native KafkaSink
<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>
that is part of Spark 2.2
<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>
.

On Sun, May 14, 2017 at 9:31 AM, Revin Chalil  wrote:

> Hi TD / Michael,
>
>
>
> I am trying to use the foreach sink to write to Kafka and followed this 
> <https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>
>  from DBricks blog by Sunil Sitaula 
> <https://databricks.com/blog/author/sunil-sitaula> . I get the below with 
> DF.writeStream.foreach(writer).outputMode("update").start() when using a 
> simple DF
>
> Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Below is the snippet
>
> *val *data = session
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KafkaBroker)
>   .option("subscribe", InTopic)
>   .load()
>   .select($"value".as[Array[Byte]])
>   .flatMap(d => {
> *var *events = AvroHelper.*readEvents*(d)
> events.map((event: HdfsEvent) => {
>   *var *payload = EventPayloadParser.*read*(event.getPayload)
>   *new *KafkaMessage(payload)
> })
>   })
>
>
>
> *case class *KafkaMessage(
>   payload: String)
>
>
>
> This is where I use the foreach
>
> *val *writer = *new *KafkaSink("kafka-topic", KafkaBroker)
> *val *query = data.writeStream.foreach(writer).outputMode("update").start()
>
>
>
> In this case, it shows –
>
> Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: 
> Main.KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Any help is much appreciated. Thank you.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *Sent:* Friday, January 13, 2017 3:31 PM
> *To:* Koert Kuipers 
> *Cc:* Peyman Mohajerian ; Senthil Kumar <
> senthilec...@gmail.com>; User ;
> senthilec...@apache.org
> *Subject:* Re: Spark SQL DataFrame to Kafka Topic
>
>
>
> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#using-foreach
>
>
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers  wrote:
>
> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
>
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
> wrote:
>
> Yes, it is called Structured Streaming: https://docs.
> databricks.com/_static/notebooks/structured-streaming-kafka.html
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
>
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
> wrote:
>
> Hi Team ,
>
>
>
>  Sorry if this question already asked in this forum..
>
>
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
>
>
> Here is my Code which Reads Parquet File :
>
>
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
>
> Cheers,
>
> Senthil
>
>
>
>
>
>
>


RE: Spark SQL DataFrame to Kafka Topic

2017-05-15 Thread Revin Chalil
I couldn’t get this working yet.. If anyone has successfully used forEach Sink 
for kafka with structured streaming, plz share... Thanks.

From: Revin Chalil [mailto:rcha...@expedia.com]
Sent: Sunday, May 14, 2017 9:32 AM
To: Tathagata Das ; mich...@databricks.com
Cc: Peyman Mohajerian ; Senthil Kumar 
; User ; 
senthilec...@apache.org; Ofir Manor ; Hemanth Gudela 
; lucas.g...@gmail.com; Koert Kuipers 
; silvio.fior...@granturing.com
Subject: RE: Spark SQL DataFrame to Kafka Topic

Hi TD / Michael,


I am trying to use the foreach sink to write to Kafka and followed 
this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>
 from DBricks blog by Sunil 
Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature



Below is the snippet

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
  var payload = EventPayloadParser.read(event.getPayload)
  new KafkaMessage(payload)
})
  })



case class KafkaMessage(
  payload: String)



This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()



In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature



Any help is much appreciated. Thank you.


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers mailto:ko...@tresata.com>>
Cc: Peyman Mohajerian mailto:mohaj...@gmail.com>>; Senthil 
Kumar mailto:senthilec...@gmail.com>>; User 
mailto:user@spark.apache.org>>; 
senthilec...@apache.org<mailto:senthilec...@apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
mailto:ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to 
kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
mailto:mohaj...@gmail.com>> wrote:
Yes, it is called Structured Streaming: 
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
mailto:senthilec...@gmail.com>> wrote:
Hi Team ,

 Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :


val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df = sqlContext.read.parquet("/temp/*.parquet")

df.registerTempTable("beacons")



I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Cheers,

Senthil





RE: Spark SQL DataFrame to Kafka Topic

2017-05-14 Thread Revin Chalil
Hi TD / Michael,


I am trying to use the foreach sink to write to Kafka and followed 
this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>
 from DBricks blog by Sunil 
Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature



Below is the snippet

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
  var payload = EventPayloadParser.read(event.getPayload)
  new KafkaMessage(payload)
})
  })



case class KafkaMessage(
  payload: String)



This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()



In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature



Any help is much appreciated. Thank you.


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers 
Cc: Peyman Mohajerian ; Senthil Kumar 
; User ; senthilec...@apache.org
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
mailto:ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to 
kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
mailto:mohaj...@gmail.com>> wrote:
Yes, it is called Structured Streaming: 
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
mailto:senthilec...@gmail.com>> wrote:
Hi Team ,

 Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :


val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df = sqlContext.read.parquet("/temp/*.parquet")

df.registerTempTable("beacons")



I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Cheers,

Senthil





Re: Spark SQL DataFrame to Kafka Topic

2017-01-24 Thread ayan guha
Is there a plan to have this in pyspark in dome later release?

On Wed, 25 Jan 2017 at 10:01 am, Koert Kuipers  wrote:

> i implemented a sink using foreach it was indeed straightforward thanks
>
> On Fri, Jan 13, 2017 at 6:30 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers  wrote:
>
> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
> wrote:
>
> Yes, it is called Structured Streaming:
> https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
> wrote:
>
> Hi Team ,
>
>  Sorry if this question already asked in this forum..
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
> Here is my Code which Reads Parquet File :
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
> Cheers,
>
> Senthil
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: Spark SQL DataFrame to Kafka Topic

2017-01-24 Thread Koert Kuipers
i implemented a sink using foreach it was indeed straightforward thanks

On Fri, Jan 13, 2017 at 6:30 PM, Tathagata Das 
wrote:

> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#using-foreach
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers  wrote:
>
>> how do you do this with structured streaming? i see no mention of writing
>> to kafka
>>
>> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
>> wrote:
>>
>>> Yes, it is called Structured Streaming: https://docs.databr
>>> icks.com/_static/notebooks/structured-streaming-kafka.html
>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>> gramming-guide.html
>>>
>>> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
>>> wrote:
>>>
 Hi Team ,

  Sorry if this question already asked in this forum..

 Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

 Here is my Code which Reads Parquet File :

 *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*

 *val df = sqlContext.read.parquet("/temp/*.parquet")*

 *df.registerTempTable("beacons")*


 I want to directly ingest df DataFrame to Kafka ! Is there any way to
 achieve this ??


 Cheers,

 Senthil

>>>
>>>
>>
>


Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Tathagata Das
Structured Streaming has a foreach sink, where you can essentially do what
you want with your data. Its easy to create a Kafka producer, and write the
data out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers  wrote:

> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
> wrote:
>
>> Yes, it is called Structured Streaming: https://docs.databr
>> icks.com/_static/notebooks/structured-streaming-kafka.html
>> http://spark.apache.org/docs/latest/structured-streaming-pro
>> gramming-guide.html
>>
>> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
>> wrote:
>>
>>> Hi Team ,
>>>
>>>  Sorry if this question already asked in this forum..
>>>
>>> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>>>
>>> Here is my Code which Reads Parquet File :
>>>
>>> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>>>
>>> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>>>
>>> *df.registerTempTable("beacons")*
>>>
>>>
>>> I want to directly ingest df DataFrame to Kafka ! Is there any way to
>>> achieve this ??
>>>
>>>
>>> Cheers,
>>>
>>> Senthil
>>>
>>
>>
>


Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Koert Kuipers
how do you do this with structured streaming? i see no mention of writing
to kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
wrote:

> Yes, it is called Structured Streaming: https://docs.
> databricks.com/_static/notebooks/structured-streaming-kafka.html
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
> wrote:
>
>> Hi Team ,
>>
>>  Sorry if this question already asked in this forum..
>>
>> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>>
>> Here is my Code which Reads Parquet File :
>>
>> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>>
>> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>>
>> *df.registerTempTable("beacons")*
>>
>>
>> I want to directly ingest df DataFrame to Kafka ! Is there any way to
>> achieve this ??
>>
>>
>> Cheers,
>>
>> Senthil
>>
>
>


Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Peyman Mohajerian
Yes, it is called Structured Streaming:
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
wrote:

> Hi Team ,
>
>  Sorry if this question already asked in this forum..
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
> Here is my Code which Reads Parquet File :
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
> Cheers,
>
> Senthil
>