Both works. The asynchronous method with listener will have less of down time, just that the first trigger/batch after the asynchronous unpersist+persist will probably take longer as it has to reload the data.
On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> wrote: > Thanks tathagata das actually I'm planning to something like this > > activeQuery.stop() > > //unpersist and persist cached data frame > > df.unpersist() > > //read the updated data //data size of df is around 100gb > > df.persist() > > activeQuery = startQuery() > > > the cached data frame size around 100gb ,so the question is this the right > place to refresh this huge cached data frame ? > > I'm also trying to refresh cached data frame in onqueryprogress() method > in a class which extends StreamingQuerylistner > > Would like to know which is the best place to refresh cached data frame > and why > > Thanks again for the below response > > On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> You can do something like this. >> >> >> def startQuery(): StreamingQuery = { >> // create your streaming dataframes >> // start the query with the same checkpoint directory} >> >> // handle to the active queryvar activeQuery: StreamingQuery = null >> while(!stopped) { >> >> if (activeQuery = null) { // if query not active, start query >> activeQuery = startQuery() >> >> } else if (shouldRestartQuery()) { // check your condition and >> restart query >> activeQuery.stop() >> activeQuery = startQuery() >> } >> >> activeQuery.awaitTermination(100) // wait for 100 ms. >> // if there is any error it will throw exception and quit the loop >> // otherwise it will keep checking the condition every 100ms} >> >> >> >> >> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Thanks Michael >>> >>> I guess my question is little confusing ..let me try again >>> >>> >>> I would like to restart streaming query programmatically while my >>> streaming application is running based on a condition and why I want to do >>> this >>> >>> I want to refresh a cached data frame based on a condition and the best >>> way to do this restart streaming query suggested by Tdas below for similar >>> problem >>> >>> http://mail-archives.apache.org/mod_mbox/spark-user/ >>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ >>> fwmn4jtm1x1nd...@mail.gmail.com%3e >>> >>> I do understand that checkpoint if helps in recovery and failures but I >>> would like to know "how to restart streaming query programmatically without >>> stopping my streaming application" >>> >>> In place of query.awaittermination should I need to have an logic to >>> restart query? Please suggest >>> >>> >>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> >>> wrote: >>> >>>> See https://spark.apache.org/docs/latest/structured- >>>> streaming-programming-guide.html#recovering-from-failures- >>>> with-checkpointing >>>> >>>> Though I think that this currently doesn't work with the console sink. >>>> >>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com >>>> > wrote: >>>> >>>>> Hi, >>>>> >>>>>> >>>>>> I'm trying to restart a streaming query to refresh cached data frame >>>>>> >>>>>> Where and how should I restart streaming query >>>>>> >>>>> >>>>> >>>>> val sparkSes = SparkSession >>>>> >>>>> .builder >>>>> >>>>> .config("spark.master", "local") >>>>> >>>>> .appName("StreamingCahcePoc") >>>>> >>>>> .getOrCreate() >>>>> >>>>> >>>>> >>>>> import sparkSes.implicits._ >>>>> >>>>> >>>>> >>>>> val dataDF = sparkSes.readStream >>>>> >>>>> .schema(streamSchema) >>>>> >>>>> .csv("testData") >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> val query = counts.writeStream >>>>> >>>>> .outputMode("complete") >>>>> >>>>> .format("console") >>>>> >>>>> .start() >>>>> >>>>> >>>>> query.awaittermination() >>>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> >>>> >>