And also is query.stop() is graceful stop operation?what happens to already received data will it be processed ?
On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com> wrote: > Ok thanks > > Few more > > 1.when I looked into the documentation it says onQueryprogress is not > threadsafe ,So Is this method would be the right place to refresh cache?and > no need to restart query if I choose listener ? > > The methods are not thread-safe as they may be called from different > threads. > > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala > > > > 2.if I use streamingquerylistner onqueryprogress my understanding is > method will be executed only when the query is in progress so if I refresh > data frame here without restarting query will it impact application ? > > 3.should I use unpersist (Boolean) blocking method or async method > unpersist() as the data size is big. > > I feel your solution is better as it stops query --> refresh cache --> > starts query if I compromise on little downtime even cached dataframe is > huge .I'm not sure how listener behaves as it's asynchronous, correct me if > I'm wrong. > > On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> Both works. The asynchronous method with listener will have less of down >> time, just that the first trigger/batch after the asynchronous >> unpersist+persist will probably take longer as it has to reload the data. >> >> >> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Thanks tathagata das actually I'm planning to something like this >>> >>> activeQuery.stop() >>> >>> //unpersist and persist cached data frame >>> >>> df.unpersist() >>> >>> //read the updated data //data size of df is around 100gb >>> >>> df.persist() >>> >>> activeQuery = startQuery() >>> >>> >>> the cached data frame size around 100gb ,so the question is this the >>> right place to refresh this huge cached data frame ? >>> >>> I'm also trying to refresh cached data frame in onqueryprogress() method >>> in a class which extends StreamingQuerylistner >>> >>> Would like to know which is the best place to refresh cached data frame >>> and why >>> >>> Thanks again for the below response >>> >>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> You can do something like this. >>>> >>>> >>>> def startQuery(): StreamingQuery = { >>>> // create your streaming dataframes >>>> // start the query with the same checkpoint directory} >>>> >>>> // handle to the active queryvar activeQuery: StreamingQuery = null >>>> while(!stopped) { >>>> >>>> if (activeQuery = null) { // if query not active, start query >>>> activeQuery = startQuery() >>>> >>>> } else if (shouldRestartQuery()) { // check your condition and >>>> restart query >>>> activeQuery.stop() >>>> activeQuery = startQuery() >>>> } >>>> >>>> activeQuery.awaitTermination(100) // wait for 100 ms. >>>> // if there is any error it will throw exception and quit the loop >>>> // otherwise it will keep checking the condition every 100ms} >>>> >>>> >>>> >>>> >>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com >>>> > wrote: >>>> >>>>> Thanks Michael >>>>> >>>>> I guess my question is little confusing ..let me try again >>>>> >>>>> >>>>> I would like to restart streaming query programmatically while my >>>>> streaming application is running based on a condition and why I want to do >>>>> this >>>>> >>>>> I want to refresh a cached data frame based on a condition and the >>>>> best way to do this restart streaming query suggested by Tdas below for >>>>> similar problem >>>>> >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>>> >>>>> I do understand that checkpoint if helps in recovery and failures but >>>>> I would like to know "how to restart streaming query programmatically >>>>> without stopping my streaming application" >>>>> >>>>> In place of query.awaittermination should I need to have an logic to >>>>> restart query? Please suggest >>>>> >>>>> >>>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < >>>>> mich...@databricks.com> wrote: >>>>> >>>>>> See >>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>>>>> >>>>>> Though I think that this currently doesn't work with the console sink. >>>>>> >>>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep < >>>>>> purna2prad...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>>> >>>>>>>> I'm trying to restart a streaming query to refresh cached data >>>>>>>> frame >>>>>>>> >>>>>>>> Where and how should I restart streaming query >>>>>>>> >>>>>>> >>>>>>> >>>>>>> val sparkSes = SparkSession >>>>>>> >>>>>>> .builder >>>>>>> >>>>>>> .config("spark.master", "local") >>>>>>> >>>>>>> .appName("StreamingCahcePoc") >>>>>>> >>>>>>> .getOrCreate() >>>>>>> >>>>>>> >>>>>>> >>>>>>> import sparkSes.implicits._ >>>>>>> >>>>>>> >>>>>>> >>>>>>> val dataDF = sparkSes.readStream >>>>>>> >>>>>>> .schema(streamSchema) >>>>>>> >>>>>>> .csv("testData") >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> val query = counts.writeStream >>>>>>> >>>>>>> .outputMode("complete") >>>>>>> >>>>>>> .format("console") >>>>>>> >>>>>>> .start() >>>>>>> >>>>>>> >>>>>>> query.awaittermination() >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>> >>