Re: Restart streaming query spark 2.1 structured streaming
And also is query.stop() is graceful stop operation?what happens to already received data will it be processed ? On Tue, Aug 15, 2017 at 7:21 PM purna pradeep wrote: > Ok thanks > > Few more > > 1.when I looked into the documentation it says onQueryprogress is not > threadsafe ,So Is this method would be the right place to refresh cache?and > no need to restart query if I choose listener ? > > The methods are not thread-safe as they may be called from different > threads. > > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala > > > > 2.if I use streamingquerylistner onqueryprogress my understanding is > method will be executed only when the query is in progress so if I refresh > data frame here without restarting query will it impact application ? > > 3.should I use unpersist (Boolean) blocking method or async method > unpersist() as the data size is big. > > I feel your solution is better as it stops query --> refresh cache --> > starts query if I compromise on little downtime even cached dataframe is > huge .I'm not sure how listener behaves as it's asynchronous, correct me if > I'm wrong. > > On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das > wrote: > >> Both works. The asynchronous method with listener will have less of down >> time, just that the first trigger/batch after the asynchronous >> unpersist+persist will probably take longer as it has to reload the data. >> >> >> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep >> wrote: >> >>> Thanks tathagata das actually I'm planning to something like this >>> >>> activeQuery.stop() >>> >>> //unpersist and persist cached data frame >>> >>> df.unpersist() >>> >>> //read the updated data //data size of df is around 100gb >>> >>> df.persist() >>> >>> activeQuery = startQuery() >>> >>> >>> the cached data frame size around 100gb ,so the question is this the >>> right place to refresh this huge cached data frame ? >>> >>> I'm also trying to refresh cached data frame in onqueryprogress() method >>> in a class which extends StreamingQuerylistner >>> >>> Would like to know which is the best place to refresh cached data frame >>> and why >>> >>> Thanks again for the below response >>> >>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if query not active, start query activeQuery = startQuery() } else if (shouldRestartQuery()) { // check your condition and restart query activeQuery.stop() activeQuery = startQuery() } activeQuery.awaitTermination(100) // wait for 100 ms. // if there is any error it will throw exception and quit the loop // otherwise it will keep checking the condition every 100ms} On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >>> > wrote: > Thanks Michael > > I guess my question is little confusing ..let me try again > > > I would like to restart streaming query programmatically while my > streaming application is running based on a condition and why I want to do > this > > I want to refresh a cached data frame based on a condition and the > best way to do this restart streaming query suggested by Tdas below for > similar problem > > > http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e > > I do understand that checkpoint if helps in recovery and failures but > I would like to know "how to restart streaming query programmatically > without stopping my streaming application" > > In place of query.awaittermination should I need to have an logic to > restart query? Please suggest > > > On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < > mich...@databricks.com> wrote: > >> See >> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >> >> Though I think that this currently doesn't work with the console sink. >> >> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep < >> purna2prad...@gmail.com> wrote: >> >>> Hi, >>> I'm trying to restart a streaming query to refresh cached data frame Where and how should I restart streaming query >>> >>> >>> val sparkSes = SparkSession >>> >>> .builder >>> >>> .config("spark.master", "local") >>> >>> .appName("StreamingCahcePoc") >
Re: Restart streaming query spark 2.1 structured streaming
Ok thanks Few more 1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>>// create your streaming dataframes >>>// start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>>if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>>} else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>>} >>> >>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>// if there is any error it will throw exception and quit the loop >>>// otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >>> wrote: >>> Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < mich...@databricks.com> wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep < > purna2prad...@gmail.com> wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMo
Re: Restart streaming query spark 2.1 structured streaming
Both works. The asynchronous method with listener will have less of down time, just that the first trigger/batch after the asynchronous unpersist+persist will probably take longer as it has to reload the data. On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep wrote: > Thanks tathagata das actually I'm planning to something like this > > activeQuery.stop() > > //unpersist and persist cached data frame > > df.unpersist() > > //read the updated data //data size of df is around 100gb > > df.persist() > > activeQuery = startQuery() > > > the cached data frame size around 100gb ,so the question is this the right > place to refresh this huge cached data frame ? > > I'm also trying to refresh cached data frame in onqueryprogress() method > in a class which extends StreamingQuerylistner > > Would like to know which is the best place to refresh cached data frame > and why > > Thanks again for the below response > > On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das > wrote: > >> You can do something like this. >> >> >> def startQuery(): StreamingQuery = { >>// create your streaming dataframes >>// start the query with the same checkpoint directory} >> >> // handle to the active queryvar activeQuery: StreamingQuery = null >> while(!stopped) { >> >>if (activeQuery = null) { // if query not active, start query >> activeQuery = startQuery() >> >>} else if (shouldRestartQuery()) { // check your condition and >> restart query >> activeQuery.stop() >> activeQuery = startQuery() >>} >> >>activeQuery.awaitTermination(100) // wait for 100 ms. >>// if there is any error it will throw exception and quit the loop >>// otherwise it will keep checking the condition every 100ms} >> >> >> >> >> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >> wrote: >> >>> Thanks Michael >>> >>> I guess my question is little confusing ..let me try again >>> >>> >>> I would like to restart streaming query programmatically while my >>> streaming application is running based on a condition and why I want to do >>> this >>> >>> I want to refresh a cached data frame based on a condition and the best >>> way to do this restart streaming query suggested by Tdas below for similar >>> problem >>> >>> http://mail-archives.apache.org/mod_mbox/spark-user/ >>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ >>> fwmn4jtm1x1nd...@mail.gmail.com%3e >>> >>> I do understand that checkpoint if helps in recovery and failures but I >>> would like to know "how to restart streaming query programmatically without >>> stopping my streaming application" >>> >>> In place of query.awaittermination should I need to have an logic to >>> restart query? Please suggest >>> >>> >>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust >>> wrote: >>> See https://spark.apache.org/docs/latest/structured- streaming-programming-guide.html#recovering-from-failures- with-checkpointing Though I think that this currently doesn't work with the console sink. On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >>> > wrote: > Hi, > >> >> I'm trying to restart a streaming query to refresh cached data frame >> >> Where and how should I restart streaming query >> > > > val sparkSes = SparkSession > > .builder > > .config("spark.master", "local") > > .appName("StreamingCahcePoc") > > .getOrCreate() > > > > import sparkSes.implicits._ > > > > val dataDF = sparkSes.readStream > > .schema(streamSchema) > > .csv("testData") > > > > > >val query = counts.writeStream > > .outputMode("complete") > > .format("console") > > .start() > > > query.awaittermination() > > > >> >> >> >>
Re: Restart streaming query spark 2.1 structured streaming
Thanks tathagata das actually I'm planning to something like this activeQuery.stop() //unpersist and persist cached data frame df.unpersist() //read the updated data //data size of df is around 100gb df.persist() activeQuery = startQuery() the cached data frame size around 100gb ,so the question is this the right place to refresh this huge cached data frame ? I'm also trying to refresh cached data frame in onqueryprogress() method in a class which extends StreamingQuerylistner Would like to know which is the best place to refresh cached data frame and why Thanks again for the below response On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das wrote: > You can do something like this. > > > def startQuery(): StreamingQuery = { >// create your streaming dataframes >// start the query with the same checkpoint directory} > > // handle to the active queryvar activeQuery: StreamingQuery = null > while(!stopped) { > >if (activeQuery = null) { // if query not active, start query > activeQuery = startQuery() > >} else if (shouldRestartQuery()) { // check your condition and > restart query > activeQuery.stop() > activeQuery = startQuery() >} > >activeQuery.awaitTermination(100) // wait for 100 ms. >// if there is any error it will throw exception and quit the loop >// otherwise it will keep checking the condition every 100ms} > > > > > On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep > wrote: > >> Thanks Michael >> >> I guess my question is little confusing ..let me try again >> >> >> I would like to restart streaming query programmatically while my >> streaming application is running based on a condition and why I want to do >> this >> >> I want to refresh a cached data frame based on a condition and the best >> way to do this restart streaming query suggested by Tdas below for similar >> problem >> >> >> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >> >> I do understand that checkpoint if helps in recovery and failures but I >> would like to know "how to restart streaming query programmatically without >> stopping my streaming application" >> >> In place of query.awaittermination should I need to have an logic to >> restart query? Please suggest >> >> >> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust >> wrote: >> >>> See >>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>> >>> Though I think that this currently doesn't work with the console sink. >>> >>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >>> wrote: >>> Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > > >>> >
Re: Restart streaming query spark 2.1 structured streaming
You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if query not active, start query activeQuery = startQuery() } else if (shouldRestartQuery()) { // check your condition and restart query activeQuery.stop() activeQuery = startQuery() } activeQuery.awaitTermination(100) // wait for 100 ms. // if there is any error it will throw exception and quit the loop // otherwise it will keep checking the condition every 100ms} On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep wrote: > Thanks Michael > > I guess my question is little confusing ..let me try again > > > I would like to restart streaming query programmatically while my > streaming application is running based on a condition and why I want to do > this > > I want to refresh a cached data frame based on a condition and the best > way to do this restart streaming query suggested by Tdas below for similar > problem > > http://mail-archives.apache.org/mod_mbox/spark-user/ > 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ > fwmn4jtm1x1nd...@mail.gmail.com%3e > > I do understand that checkpoint if helps in recovery and failures but I > would like to know "how to restart streaming query programmatically without > stopping my streaming application" > > In place of query.awaittermination should I need to have an logic to > restart query? Please suggest > > > On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust > wrote: > >> See https://spark.apache.org/docs/latest/structured- >> streaming-programming-guide.html#recovering-from-failures- >> with-checkpointing >> >> Though I think that this currently doesn't work with the console sink. >> >> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >> wrote: >> >>> Hi, >>> I'm trying to restart a streaming query to refresh cached data frame Where and how should I restart streaming query >>> >>> >>> val sparkSes = SparkSession >>> >>> .builder >>> >>> .config("spark.master", "local") >>> >>> .appName("StreamingCahcePoc") >>> >>> .getOrCreate() >>> >>> >>> >>> import sparkSes.implicits._ >>> >>> >>> >>> val dataDF = sparkSes.readStream >>> >>> .schema(streamSchema) >>> >>> .csv("testData") >>> >>> >>> >>> >>> >>>val query = counts.writeStream >>> >>> .outputMode("complete") >>> >>> .format("console") >>> >>> .start() >>> >>> >>> query.awaittermination() >>> >>> >>> >>
Re: Restart streaming query spark 2.1 structured streaming
Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep > wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMode("complete") >> >> .format("console") >> >> .start() >> >> >> query.awaittermination() >> >> >> >>> >>> >>> >
Re: Restart streaming query spark 2.1 structured streaming
See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing Though I think that this currently doesn't work with the console sink. On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep wrote: > Hi, > >> >> I'm trying to restart a streaming query to refresh cached data frame >> >> Where and how should I restart streaming query >> > > > val sparkSes = SparkSession > > .builder > > .config("spark.master", "local") > > .appName("StreamingCahcePoc") > > .getOrCreate() > > > > import sparkSes.implicits._ > > > > val dataDF = sparkSes.readStream > > .schema(streamSchema) > > .csv("testData") > > > > > >val query = counts.writeStream > > .outputMode("complete") > > .format("console") > > .start() > > > query.awaittermination() > > > >> >> >>
Restart streaming query spark 2.1 structured streaming
Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > >