Ok thanks Few more
1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>> // create your streaming dataframes >>> // start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>> if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>> } else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>> } >>> >>> activeQuery.awaitTermination(100) // wait for 100 ms. >>> // if there is any error it will throw exception and quit the loop >>> // otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Thanks Michael >>>> >>>> I guess my question is little confusing ..let me try again >>>> >>>> >>>> I would like to restart streaming query programmatically while my >>>> streaming application is running based on a condition and why I want to do >>>> this >>>> >>>> I want to refresh a cached data frame based on a condition and the best >>>> way to do this restart streaming query suggested by Tdas below for similar >>>> problem >>>> >>>> >>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>> >>>> I do understand that checkpoint if helps in recovery and failures but I >>>> would like to know "how to restart streaming query programmatically without >>>> stopping my streaming application" >>>> >>>> In place of query.awaittermination should I need to have an logic to >>>> restart query? Please suggest >>>> >>>> >>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> See >>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>>>> >>>>> Though I think that this currently doesn't work with the console sink. >>>>> >>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep < >>>>> purna2prad...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>>> >>>>>>> I'm trying to restart a streaming query to refresh cached data frame >>>>>>> >>>>>>> Where and how should I restart streaming query >>>>>>> >>>>>> >>>>>> >>>>>> val sparkSes = SparkSession >>>>>> >>>>>> .builder >>>>>> >>>>>> .config("spark.master", "local") >>>>>> >>>>>> .appName("StreamingCahcePoc") >>>>>> >>>>>> .getOrCreate() >>>>>> >>>>>> >>>>>> >>>>>> import sparkSes.implicits._ >>>>>> >>>>>> >>>>>> >>>>>> val dataDF = sparkSes.readStream >>>>>> >>>>>> .schema(streamSchema) >>>>>> >>>>>> .csv("testData") >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> val query = counts.writeStream >>>>>> >>>>>> .outputMode("complete") >>>>>> >>>>>> .format("console") >>>>>> >>>>>> .start() >>>>>> >>>>>> >>>>>> query.awaittermination() >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>> >>> >