Re: [Structured Streaming] Using File Sink to store to hive table.
Got it, thanks! 2017-02-11 0:56 GMT-08:00 Sam Elamin : > Here's a link to the thread > > http://apache-spark-developers-list.1001551.n3.nabble.com/Structured- > Streaming-Dropping-Duplicates-td20884.html > > On Sat, 11 Feb 2017 at 08:47, Sam Elamin wrote: > >> Hey Egor >> >> >> You can use for each writer or you can write a custom sink. I personally >> went with a custom sink since I get a dataframe per batch >> >> https://github.com/samelamin/spark-bigquery/blob/master/ >> src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala >> >> You can have a look at how I implemented something similar to file sink >> that in the event if a failure skips batches already written >> >> >> Also have a look at Micheals reply to me a few days ago on exactly the >> same topic. The email subject was called structured streaming. Dropping >> duplicates >> >> >> Regards >> >> Sam >> >> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski wrote: >> >> "Something like that" I've never tried it out myself so I'm only >> guessing having a brief look at the API. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov >> wrote: >> > Jacek, so I create cache in ForeachWriter, in all "process()" I write >> to it >> > and on close I flush? Something like that? >> > >> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski : >> >> >> >> Hi, >> >> >> >> Yes, that's ForeachWriter. >> >> >> >> Yes, it works with element by element. You're looking for mapPartition >> >> and ForeachWriter has partitionId that you could use to implement a >> >> similar thing. >> >> >> >> Pozdrawiam, >> >> Jacek Laskowski >> >> >> >> https://medium.com/@jaceklaskowski/ >> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> >> Follow me at https://twitter.com/jaceklaskowski >> >> >> >> >> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov >> >> wrote: >> >> > Jacek, you mean >> >> > >> >> > http://spark.apache.org/docs/latest/api/scala/index.html# >> org.apache.spark.sql.ForeachWriter >> >> > ? I do not understand how to use it, since it passes every value >> >> > separately, >> >> > not every partition. And addding to table value by value would not >> work >> >> > >> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : >> >> >> >> >> >> Hi, >> >> >> >> >> >> Have you considered foreach sink? >> >> >> >> >> >> Jacek >> >> >> >> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" >> wrote: >> >> >>> >> >> >>> Hi, I'm thinking of using Structured Streaming instead of old >> >> >>> streaming, >> >> >>> but I need to be able to save results to Hive table. Documentation >> for >> >> >>> file >> >> >>> sink >> >> >>> >> >> >>> says(http://spark.apache.org/docs/latest/structured- >> streaming-programming-guide.html#output-sinks): >> >> >>> "Supports writes to partitioned tables. ". But being able to write >> to >> >> >>> partitioned directories is not enough to write to the table: >> someone >> >> >>> needs >> >> >>> to write to Hive metastore. How can I use Structured Streaming and >> >> >>> write to >> >> >>> Hive table? >> >> >>> >> >> >>> -- >> >> >>> Sincerely yours >> >> >>> Egor Pakhomov >> >> > >> >> > >> >> > >> >> > >> >> > -- >> >> > Sincerely yours >> >> > Egor Pakhomov >> > >> > >> > >> > >> > -- >> > Sincerely yours >> > Egor Pakhomov >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- *Sincerely yoursEgor Pakhomov*
Re: [Structured Streaming] Using File Sink to store to hive table.
Here's a link to the thread http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-Dropping-Duplicates-td20884.html On Sat, 11 Feb 2017 at 08:47, Sam Elamin wrote: > Hey Egor > > > You can use for each writer or you can write a custom sink. I personally > went with a custom sink since I get a dataframe per batch > > > https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala > > You can have a look at how I implemented something similar to file sink > that in the event if a failure skips batches already written > > > Also have a look at Micheals reply to me a few days ago on exactly the > same topic. The email subject was called structured streaming. Dropping > duplicates > > > Regards > > Sam > > On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski wrote: > > "Something like that" I've never tried it out myself so I'm only > guessing having a brief look at the API. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov > wrote: > > Jacek, so I create cache in ForeachWriter, in all "process()" I write to > it > > and on close I flush? Something like that? > > > > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski : > >> > >> Hi, > >> > >> Yes, that's ForeachWriter. > >> > >> Yes, it works with element by element. You're looking for mapPartition > >> and ForeachWriter has partitionId that you could use to implement a > >> similar thing. > >> > >> Pozdrawiam, > >> Jacek Laskowski > >> > >> https://medium.com/@jaceklaskowski/ > >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > >> Follow me at https://twitter.com/jaceklaskowski > >> > >> > >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov > >> wrote: > >> > Jacek, you mean > >> > > >> > > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter > >> > ? I do not understand how to use it, since it passes every value > >> > separately, > >> > not every partition. And addding to table value by value would not > work > >> > > >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : > >> >> > >> >> Hi, > >> >> > >> >> Have you considered foreach sink? > >> >> > >> >> Jacek > >> >> > >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" > wrote: > >> >>> > >> >>> Hi, I'm thinking of using Structured Streaming instead of old > >> >>> streaming, > >> >>> but I need to be able to save results to Hive table. Documentation > for > >> >>> file > >> >>> sink > >> >>> > >> >>> says( > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks > ): > >> >>> "Supports writes to partitioned tables. ". But being able to write > to > >> >>> partitioned directories is not enough to write to the table: someone > >> >>> needs > >> >>> to write to Hive metastore. How can I use Structured Streaming and > >> >>> write to > >> >>> Hive table? > >> >>> > >> >>> -- > >> >>> Sincerely yours > >> >>> Egor Pakhomov > >> > > >> > > >> > > >> > > >> > -- > >> > Sincerely yours > >> > Egor Pakhomov > > > > > > > > > > -- > > Sincerely yours > > Egor Pakhomov > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [Structured Streaming] Using File Sink to store to hive table.
Hey Egor You can use for each writer or you can write a custom sink. I personally went with a custom sink since I get a dataframe per batch https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala You can have a look at how I implemented something similar to file sink that in the event if a failure skips batches already written Also have a look at Micheals reply to me a few days ago on exactly the same topic. The email subject was called structured streaming. Dropping duplicates Regards Sam On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski wrote: "Something like that" I've never tried it out myself so I'm only guessing having a brief look at the API. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov wrote: > Jacek, so I create cache in ForeachWriter, in all "process()" I write to it > and on close I flush? Something like that? > > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski : >> >> Hi, >> >> Yes, that's ForeachWriter. >> >> Yes, it works with element by element. You're looking for mapPartition >> and ForeachWriter has partitionId that you could use to implement a >> similar thing. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov >> wrote: >> > Jacek, you mean >> > >> > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter >> > ? I do not understand how to use it, since it passes every value >> > separately, >> > not every partition. And addding to table value by value would not work >> > >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : >> >> >> >> Hi, >> >> >> >> Have you considered foreach sink? >> >> >> >> Jacek >> >> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: >> >>> >> >>> Hi, I'm thinking of using Structured Streaming instead of old >> >>> streaming, >> >>> but I need to be able to save results to Hive table. Documentation for >> >>> file >> >>> sink >> >>> >> >>> says( http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks ): >> >>> "Supports writes to partitioned tables. ". But being able to write to >> >>> partitioned directories is not enough to write to the table: someone >> >>> needs >> >>> to write to Hive metastore. How can I use Structured Streaming and >> >>> write to >> >>> Hive table? >> >>> >> >>> -- >> >>> Sincerely yours >> >>> Egor Pakhomov >> > >> > >> > >> > >> > -- >> > Sincerely yours >> > Egor Pakhomov > > > > > -- > Sincerely yours > Egor Pakhomov - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Structured Streaming] Using File Sink to store to hive table.
"Something like that" I've never tried it out myself so I'm only guessing having a brief look at the API. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov wrote: > Jacek, so I create cache in ForeachWriter, in all "process()" I write to it > and on close I flush? Something like that? > > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski : >> >> Hi, >> >> Yes, that's ForeachWriter. >> >> Yes, it works with element by element. You're looking for mapPartition >> and ForeachWriter has partitionId that you could use to implement a >> similar thing. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov >> wrote: >> > Jacek, you mean >> > >> > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter >> > ? I do not understand how to use it, since it passes every value >> > separately, >> > not every partition. And addding to table value by value would not work >> > >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : >> >> >> >> Hi, >> >> >> >> Have you considered foreach sink? >> >> >> >> Jacek >> >> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: >> >>> >> >>> Hi, I'm thinking of using Structured Streaming instead of old >> >>> streaming, >> >>> but I need to be able to save results to Hive table. Documentation for >> >>> file >> >>> sink >> >>> >> >>> says(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks): >> >>> "Supports writes to partitioned tables. ". But being able to write to >> >>> partitioned directories is not enough to write to the table: someone >> >>> needs >> >>> to write to Hive metastore. How can I use Structured Streaming and >> >>> write to >> >>> Hive table? >> >>> >> >>> -- >> >>> Sincerely yours >> >>> Egor Pakhomov >> > >> > >> > >> > >> > -- >> > Sincerely yours >> > Egor Pakhomov > > > > > -- > Sincerely yours > Egor Pakhomov - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Structured Streaming] Using File Sink to store to hive table.
Jacek, so I create cache in ForeachWriter, in all "process()" I write to it and on close I flush? Something like that? 2017-02-09 12:42 GMT-08:00 Jacek Laskowski : > Hi, > > Yes, that's ForeachWriter. > > Yes, it works with element by element. You're looking for mapPartition > and ForeachWriter has partitionId that you could use to implement a > similar thing. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov > wrote: > > Jacek, you mean > > http://spark.apache.org/docs/latest/api/scala/index.html# > org.apache.spark.sql.ForeachWriter > > ? I do not understand how to use it, since it passes every value > separately, > > not every partition. And addding to table value by value would not work > > > > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : > >> > >> Hi, > >> > >> Have you considered foreach sink? > >> > >> Jacek > >> > >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: > >>> > >>> Hi, I'm thinking of using Structured Streaming instead of old > streaming, > >>> but I need to be able to save results to Hive table. Documentation for > file > >>> sink > >>> says(http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html#output-sinks): > >>> "Supports writes to partitioned tables. ". But being able to write to > >>> partitioned directories is not enough to write to the table: someone > needs > >>> to write to Hive metastore. How can I use Structured Streaming and > write to > >>> Hive table? > >>> > >>> -- > >>> Sincerely yours > >>> Egor Pakhomov > > > > > > > > > > -- > > Sincerely yours > > Egor Pakhomov > -- *Sincerely yoursEgor Pakhomov*
Re: [Structured Streaming] Using File Sink to store to hive table.
Hi, Yes, that's ForeachWriter. Yes, it works with element by element. You're looking for mapPartition and ForeachWriter has partitionId that you could use to implement a similar thing. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov wrote: > Jacek, you mean > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter > ? I do not understand how to use it, since it passes every value separately, > not every partition. And addding to table value by value would not work > > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : >> >> Hi, >> >> Have you considered foreach sink? >> >> Jacek >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: >>> >>> Hi, I'm thinking of using Structured Streaming instead of old streaming, >>> but I need to be able to save results to Hive table. Documentation for file >>> sink >>> says(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks): >>> "Supports writes to partitioned tables. ". But being able to write to >>> partitioned directories is not enough to write to the table: someone needs >>> to write to Hive metastore. How can I use Structured Streaming and write to >>> Hive table? >>> >>> -- >>> Sincerely yours >>> Egor Pakhomov > > > > > -- > Sincerely yours > Egor Pakhomov - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Structured Streaming] Using File Sink to store to hive table.
Jacek, you mean http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter ? I do not understand how to use it, since it passes every value separately, not every partition. And addding to table value by value would not work 2017-02-07 12:10 GMT-08:00 Jacek Laskowski : > Hi, > > Have you considered foreach sink? > > Jacek > > On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: > >> Hi, I'm thinking of using Structured Streaming instead of old streaming, >> but I need to be able to save results to Hive table. Documentation for file >> sink says(http://spark.apache.org/docs/latest/structured-streamin >> g-programming-guide.html#output-sinks): "Supports writes to partitioned >> tables. ". But being able to write to partitioned directories is not >> enough to write to the table: someone needs to write to Hive metastore. How >> can I use Structured Streaming and write to Hive table? >> >> -- >> >> >> *Sincerely yoursEgor Pakhomov* >> > -- *Sincerely yoursEgor Pakhomov*
Re: [Structured Streaming] Using File Sink to store to hive table.
Hi, Have you considered foreach sink? Jacek On 6 Feb 2017 8:39 p.m., "Egor Pahomov" wrote: > Hi, I'm thinking of using Structured Streaming instead of old streaming, > but I need to be able to save results to Hive table. Documentation for file > sink says(http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html#output-sinks): "Supports writes to > partitioned tables. ". But being able to write to partitioned directories > is not enough to write to the table: someone needs to write to Hive > metastore. How can I use Structured Streaming and write to Hive table? > > -- > > > *Sincerely yoursEgor Pakhomov* >
Re: [Structured Streaming] Using File Sink to store to hive table.
I presume you may be able to implement a custom sink and use df.saveAsTable. The problem is that you will have to handle idempotence / garbage collection yourself, in case your job fails while writing, etc. On Mon, Feb 6, 2017 at 5:53 PM, Egor Pahomov wrote: > I have stream of files on HDFS with JSON events. I need to convert it to > pq in realtime, process some fields and store in simple Hive table so > people can query it. People even might want to query it with Impala, so > it's important, that it would be real Hive metastore based table. How can I > do that? > > 2017-02-06 14:25 GMT-08:00 Burak Yavuz : > >> Hi Egor, >> >> Structured Streaming handles all of its metadata itself, which files are >> actually valid, etc. You may use the "create table" syntax in SQL to treat >> it like a hive table, but it will handle all partitioning information in >> its own metadata log. Is there a specific reason that you want to store the >> information in the Hive Metastore? >> >> Best, >> Burak >> >> On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov >> wrote: >> >>> Hi, I'm thinking of using Structured Streaming instead of old streaming, >>> but I need to be able to save results to Hive table. Documentation for file >>> sink says(http://spark.apache.org/docs/latest/structured-streamin >>> g-programming-guide.html#output-sinks): "Supports writes to partitioned >>> tables. ". But being able to write to partitioned directories is not >>> enough to write to the table: someone needs to write to Hive metastore. How >>> can I use Structured Streaming and write to Hive table? >>> >>> -- >>> >>> >>> *Sincerely yoursEgor Pakhomov* >>> >> >> > > > -- > > > *Sincerely yoursEgor Pakhomov* >
Re: [Structured Streaming] Using File Sink to store to hive table.
I have stream of files on HDFS with JSON events. I need to convert it to pq in realtime, process some fields and store in simple Hive table so people can query it. People even might want to query it with Impala, so it's important, that it would be real Hive metastore based table. How can I do that? 2017-02-06 14:25 GMT-08:00 Burak Yavuz : > Hi Egor, > > Structured Streaming handles all of its metadata itself, which files are > actually valid, etc. You may use the "create table" syntax in SQL to treat > it like a hive table, but it will handle all partitioning information in > its own metadata log. Is there a specific reason that you want to store the > information in the Hive Metastore? > > Best, > Burak > > On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov > wrote: > >> Hi, I'm thinking of using Structured Streaming instead of old streaming, >> but I need to be able to save results to Hive table. Documentation for file >> sink says(http://spark.apache.org/docs/latest/structured-streamin >> g-programming-guide.html#output-sinks): "Supports writes to partitioned >> tables. ". But being able to write to partitioned directories is not >> enough to write to the table: someone needs to write to Hive metastore. How >> can I use Structured Streaming and write to Hive table? >> >> -- >> >> >> *Sincerely yoursEgor Pakhomov* >> > > -- *Sincerely yoursEgor Pakhomov*
Re: [Structured Streaming] Using File Sink to store to hive table.
Hi Egor, Structured Streaming handles all of its metadata itself, which files are actually valid, etc. You may use the "create table" syntax in SQL to treat it like a hive table, but it will handle all partitioning information in its own metadata log. Is there a specific reason that you want to store the information in the Hive Metastore? Best, Burak On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov wrote: > Hi, I'm thinking of using Structured Streaming instead of old streaming, > but I need to be able to save results to Hive table. Documentation for file > sink says(http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html#output-sinks): "Supports writes to > partitioned tables. ". But being able to write to partitioned directories > is not enough to write to the table: someone needs to write to Hive > metastore. How can I use Structured Streaming and write to Hive table? > > -- > > > *Sincerely yoursEgor Pakhomov* >
[Structured Streaming] Using File Sink to store to hive table.
Hi, I'm thinking of using Structured Streaming instead of old streaming, but I need to be able to save results to Hive table. Documentation for file sink says( http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks): "Supports writes to partitioned tables. ". But being able to write to partitioned directories is not enough to write to the table: someone needs to write to Hive metastore. How can I use Structured Streaming and write to Hive table? -- *Sincerely yoursEgor Pakhomov*