Ok thanks

Few more

1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?

The methods are not thread-safe as they may be called from different
threads.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala



2.if I use streamingquerylistner onqueryprogress my understanding is method
will be executed only when the query is in progress so if I refresh data
frame here without restarting  query will it impact application ?

3.should I use unpersist (Boolean) blocking method or async method
unpersist() as the data size is big.

I feel your solution is better as it stops query --> refresh cache -->
starts query if I compromise on little downtime even cached dataframe is
huge .I'm not sure how listener behaves as it's asynchronous, correct me if
I'm wrong.

On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Both works. The asynchronous method with listener will have less of down
> time, just that the first trigger/batch after the asynchronous
> unpersist+persist will probably take longer as it has to reload the data.
>
>
> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Thanks tathagata das actually I'm planning to something like this
>>
>> activeQuery.stop()
>>
>> //unpersist and persist cached data frame
>>
>> df.unpersist()
>>
>> //read the updated data //data size of df is around 100gb
>>
>> df.persist()
>>
>>      activeQuery = startQuery()
>>
>>
>> the cached data frame size around 100gb ,so the question is this the
>> right place to refresh this huge cached data frame ?
>>
>> I'm also trying to refresh cached data frame in onqueryprogress() method
>> in a class which extends StreamingQuerylistner
>>
>> Would like to know which is the best place to refresh cached data frame
>> and why
>>
>> Thanks again for the below response
>>
>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can do something like this.
>>>
>>>
>>> def startQuery(): StreamingQuery = {
>>>    // create your streaming dataframes
>>>    // start the query with the same checkpoint directory}
>>>
>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>> while(!stopped) {
>>>
>>>    if (activeQuery = null) {     // if query not active, start query
>>>      activeQuery = startQuery()
>>>
>>>    } else if (shouldRestartQuery())  {      // check your condition and 
>>> restart query
>>>      activeQuery.stop()
>>>      activeQuery = startQuery()
>>>    }
>>>
>>>    activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>    // if there is any error it will throw exception and quit the loop
>>>    // otherwise it will keep checking the condition every 100ms}
>>>
>>>
>>>
>>>
>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Michael
>>>>
>>>> I guess my question is little confusing ..let me try again
>>>>
>>>>
>>>> I would like to restart streaming query programmatically while my
>>>> streaming application is running based on a condition and why I want to do
>>>> this
>>>>
>>>> I want to refresh a cached data frame based on a condition and the best
>>>> way to do this restart streaming query suggested by Tdas below for similar
>>>> problem
>>>>
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>
>>>> I do understand that checkpoint if helps in recovery and failures but I
>>>> would like to know "how to restart streaming query programmatically without
>>>> stopping my streaming application"
>>>>
>>>> In place of query.awaittermination should I need to have an logic to
>>>> restart query? Please suggest
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> See
>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>>>
>>>>> Though I think that this currently doesn't work with the console sink.
>>>>>
>>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <
>>>>> purna2prad...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>>
>>>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>>>
>>>>>>> Where and how should I restart streaming query
>>>>>>>
>>>>>>
>>>>>>
>>>>>> val sparkSes = SparkSession
>>>>>>
>>>>>>       .builder
>>>>>>
>>>>>>       .config("spark.master", "local")
>>>>>>
>>>>>>       .appName("StreamingCahcePoc")
>>>>>>
>>>>>>       .getOrCreate()
>>>>>>
>>>>>>
>>>>>>
>>>>>>     import sparkSes.implicits._
>>>>>>
>>>>>>
>>>>>>
>>>>>>     val dataDF = sparkSes.readStream
>>>>>>
>>>>>>       .schema(streamSchema)
>>>>>>
>>>>>>       .csv("testData")
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>        val query = counts.writeStream
>>>>>>
>>>>>>       .outputMode("complete")
>>>>>>
>>>>>>       .format("console")
>>>>>>
>>>>>>       .start()
>>>>>>
>>>>>>
>>>>>> query.awaittermination()
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to