Re: Restart streaming query spark 2.1 structured streaming

2017-08-16 Thread purna pradeep
And also is query.stop() is graceful stop operation?what happens to already
received data will it be processed ?

On Tue, Aug 15, 2017 at 7:21 PM purna pradeep 
wrote:

> Ok thanks
>
> Few more
>
> 1.when I looked into the documentation it says onQueryprogress is not
> threadsafe ,So Is this method would be the right place to refresh cache?and
> no need to restart query if I choose listener ?
>
> The methods are not thread-safe as they may be called from different
> threads.
>
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
>
>
>
> 2.if I use streamingquerylistner onqueryprogress my understanding is
> method will be executed only when the query is in progress so if I refresh
> data frame here without restarting  query will it impact application ?
>
> 3.should I use unpersist (Boolean) blocking method or async method
> unpersist() as the data size is big.
>
> I feel your solution is better as it stops query --> refresh cache -->
> starts query if I compromise on little downtime even cached dataframe is
> huge .I'm not sure how listener behaves as it's asynchronous, correct me if
> I'm wrong.
>
> On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das 
> wrote:
>
>> Both works. The asynchronous method with listener will have less of down
>> time, just that the first trigger/batch after the asynchronous
>> unpersist+persist will probably take longer as it has to reload the data.
>>
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep 
>> wrote:
>>
>>> Thanks tathagata das actually I'm planning to something like this
>>>
>>> activeQuery.stop()
>>>
>>> //unpersist and persist cached data frame
>>>
>>> df.unpersist()
>>>
>>> //read the updated data //data size of df is around 100gb
>>>
>>> df.persist()
>>>
>>>  activeQuery = startQuery()
>>>
>>>
>>> the cached data frame size around 100gb ,so the question is this the
>>> right place to refresh this huge cached data frame ?
>>>
>>> I'm also trying to refresh cached data frame in onqueryprogress() method
>>> in a class which extends StreamingQuerylistner
>>>
>>> Would like to know which is the best place to refresh cached data frame
>>> and why
>>>
>>> Thanks again for the below response
>>>
>>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 You can do something like this.


 def startQuery(): StreamingQuery = {
// create your streaming dataframes
// start the query with the same checkpoint directory}

 // handle to the active queryvar activeQuery: StreamingQuery = null
 while(!stopped) {

if (activeQuery = null) { // if query not active, start query
  activeQuery = startQuery()

} else if (shouldRestartQuery())  {  // check your condition and 
 restart query
  activeQuery.stop()
  activeQuery = startQuery()
}

activeQuery.awaitTermination(100)   // wait for 100 ms.
// if there is any error it will throw exception and quit the loop
// otherwise it will keep checking the condition every 100ms}




 On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep  wrote:

> Thanks Michael
>
> I guess my question is little confusing ..let me try again
>
>
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
>
> I want to refresh a cached data frame based on a condition and the
> best way to do this restart streaming query suggested by Tdas below for
> similar problem
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>
> I do understand that checkpoint if helps in recovery and failures but
> I would like to know "how to restart streaming query programmatically
> without stopping my streaming application"
>
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
>
>
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> See
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>
>> Though I think that this currently doesn't work with the console sink.
>>
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <
>> purna2prad...@gmail.com> wrote:
>>
>>> Hi,
>>>

 I'm trying to restart a streaming query to refresh cached data
 frame

 Where and how should I restart streaming query

>>>
>>>
>>> val sparkSes = SparkSession
>>>
>>>   .builder

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Ok thanks

Few more

1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?

The methods are not thread-safe as they may be called from different
threads.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala



2.if I use streamingquerylistner onqueryprogress my understanding is method
will be executed only when the query is in progress so if I refresh data
frame here without restarting  query will it impact application ?

3.should I use unpersist (Boolean) blocking method or async method
unpersist() as the data size is big.

I feel your solution is better as it stops query --> refresh cache -->
starts query if I compromise on little downtime even cached dataframe is
huge .I'm not sure how listener behaves as it's asynchronous, correct me if
I'm wrong.

On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das 
wrote:

> Both works. The asynchronous method with listener will have less of down
> time, just that the first trigger/batch after the asynchronous
> unpersist+persist will probably take longer as it has to reload the data.
>
>
> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep 
> wrote:
>
>> Thanks tathagata das actually I'm planning to something like this
>>
>> activeQuery.stop()
>>
>> //unpersist and persist cached data frame
>>
>> df.unpersist()
>>
>> //read the updated data //data size of df is around 100gb
>>
>> df.persist()
>>
>>  activeQuery = startQuery()
>>
>>
>> the cached data frame size around 100gb ,so the question is this the
>> right place to refresh this huge cached data frame ?
>>
>> I'm also trying to refresh cached data frame in onqueryprogress() method
>> in a class which extends StreamingQuerylistner
>>
>> Would like to know which is the best place to refresh cached data frame
>> and why
>>
>> Thanks again for the below response
>>
>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can do something like this.
>>>
>>>
>>> def startQuery(): StreamingQuery = {
>>>// create your streaming dataframes
>>>// start the query with the same checkpoint directory}
>>>
>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>> while(!stopped) {
>>>
>>>if (activeQuery = null) { // if query not active, start query
>>>  activeQuery = startQuery()
>>>
>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>> restart query
>>>  activeQuery.stop()
>>>  activeQuery = startQuery()
>>>}
>>>
>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>// if there is any error it will throw exception and quit the loop
>>>// otherwise it will keep checking the condition every 100ms}
>>>
>>>
>>>
>>>
>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
>>> wrote:
>>>
 Thanks Michael

 I guess my question is little confusing ..let me try again


 I would like to restart streaming query programmatically while my
 streaming application is running based on a condition and why I want to do
 this

 I want to refresh a cached data frame based on a condition and the best
 way to do this restart streaming query suggested by Tdas below for similar
 problem


 http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

 I do understand that checkpoint if helps in recovery and failures but I
 would like to know "how to restart streaming query programmatically without
 stopping my streaming application"

 In place of query.awaittermination should I need to have an logic to
 restart query? Please suggest


 On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
 mich...@databricks.com> wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <
> purna2prad...@gmail.com> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
Both works. The asynchronous method with listener will have less of down
time, just that the first trigger/batch after the asynchronous
unpersist+persist will probably take longer as it has to reload the data.


On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep 
wrote:

> Thanks tathagata das actually I'm planning to something like this
>
> activeQuery.stop()
>
> //unpersist and persist cached data frame
>
> df.unpersist()
>
> //read the updated data //data size of df is around 100gb
>
> df.persist()
>
>  activeQuery = startQuery()
>
>
> the cached data frame size around 100gb ,so the question is this the right
> place to refresh this huge cached data frame ?
>
> I'm also trying to refresh cached data frame in onqueryprogress() method
> in a class which extends StreamingQuerylistner
>
> Would like to know which is the best place to refresh cached data frame
> and why
>
> Thanks again for the below response
>
> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das 
> wrote:
>
>> You can do something like this.
>>
>>
>> def startQuery(): StreamingQuery = {
>>// create your streaming dataframes
>>// start the query with the same checkpoint directory}
>>
>> // handle to the active queryvar activeQuery: StreamingQuery = null
>> while(!stopped) {
>>
>>if (activeQuery = null) { // if query not active, start query
>>  activeQuery = startQuery()
>>
>>} else if (shouldRestartQuery())  {  // check your condition and 
>> restart query
>>  activeQuery.stop()
>>  activeQuery = startQuery()
>>}
>>
>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>// if there is any error it will throw exception and quit the loop
>>// otherwise it will keep checking the condition every 100ms}
>>
>>
>>
>>
>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
>> wrote:
>>
>>> Thanks Michael
>>>
>>> I guess my question is little confusing ..let me try again
>>>
>>>
>>> I would like to restart streaming query programmatically while my
>>> streaming application is running based on a condition and why I want to do
>>> this
>>>
>>> I want to refresh a cached data frame based on a condition and the best
>>> way to do this restart streaming query suggested by Tdas below for similar
>>> problem
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/
>>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
>>> fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>
>>> I do understand that checkpoint if helps in recovery and failures but I
>>> would like to know "how to restart streaming query programmatically without
>>> stopping my streaming application"
>>>
>>> In place of query.awaittermination should I need to have an logic to
>>> restart query? Please suggest
>>>
>>>
>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
>>> wrote:
>>>
 See https://spark.apache.org/docs/latest/structured-
 streaming-programming-guide.html#recovering-from-failures-
 with-checkpointing

 Though I think that this currently doesn't work with the console sink.

 On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep  wrote:

> Hi,
>
>>
>> I'm trying to restart a streaming query to refresh cached data frame
>>
>> Where and how should I restart streaming query
>>
>
>
> val sparkSes = SparkSession
>
>   .builder
>
>   .config("spark.master", "local")
>
>   .appName("StreamingCahcePoc")
>
>   .getOrCreate()
>
>
>
> import sparkSes.implicits._
>
>
>
> val dataDF = sparkSes.readStream
>
>   .schema(streamSchema)
>
>   .csv("testData")
>
>
>
>
>
>val query = counts.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .start()
>
>
> query.awaittermination()
>
>
>
>>
>>
>>

>>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks tathagata das actually I'm planning to something like this

activeQuery.stop()

//unpersist and persist cached data frame

df.unpersist()

//read the updated data //data size of df is around 100gb

df.persist()

 activeQuery = startQuery()


the cached data frame size around 100gb ,so the question is this the right
place to refresh this huge cached data frame ?

I'm also trying to refresh cached data frame in onqueryprogress() method in
a class which extends StreamingQuerylistner

Would like to know which is the best place to refresh cached data frame and
why

Thanks again for the below response

On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das 
wrote:

> You can do something like this.
>
>
> def startQuery(): StreamingQuery = {
>// create your streaming dataframes
>// start the query with the same checkpoint directory}
>
> // handle to the active queryvar activeQuery: StreamingQuery = null
> while(!stopped) {
>
>if (activeQuery = null) { // if query not active, start query
>  activeQuery = startQuery()
>
>} else if (shouldRestartQuery())  {  // check your condition and 
> restart query
>  activeQuery.stop()
>  activeQuery = startQuery()
>}
>
>activeQuery.awaitTermination(100)   // wait for 100 ms.
>// if there is any error it will throw exception and quit the loop
>// otherwise it will keep checking the condition every 100ms}
>
>
>
>
> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
> wrote:
>
>> Thanks Michael
>>
>> I guess my question is little confusing ..let me try again
>>
>>
>> I would like to restart streaming query programmatically while my
>> streaming application is running based on a condition and why I want to do
>> this
>>
>> I want to refresh a cached data frame based on a condition and the best
>> way to do this restart streaming query suggested by Tdas below for similar
>> problem
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>
>> I do understand that checkpoint if helps in recovery and failures but I
>> would like to know "how to restart streaming query programmatically without
>> stopping my streaming application"
>>
>> In place of query.awaittermination should I need to have an logic to
>> restart query? Please suggest
>>
>>
>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
>> wrote:
>>
>>> See
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Though I think that this currently doesn't work with the console sink.
>>>
>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
>>> wrote:
>>>
 Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


 val sparkSes = SparkSession

   .builder

   .config("spark.master", "local")

   .appName("StreamingCahcePoc")

   .getOrCreate()



 import sparkSes.implicits._



 val dataDF = sparkSes.readStream

   .schema(streamSchema)

   .csv("testData")





val query = counts.writeStream

   .outputMode("complete")

   .format("console")

   .start()


 query.awaittermination()



>
>
>
>>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this.


def startQuery(): StreamingQuery = {
   // create your streaming dataframes
   // start the query with the same checkpoint directory}

// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {

   if (activeQuery = null) { // if query not active, start query
 activeQuery = startQuery()

   } else if (shouldRestartQuery())  {  // check your condition
and restart query
 activeQuery.stop()
 activeQuery = startQuery()
   }

   activeQuery.awaitTermination(100)   // wait for 100 ms.
   // if there is any error it will throw exception and quit the loop
   // otherwise it will keep checking the condition every 100ms}




On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
wrote:

> Thanks Michael
>
> I guess my question is little confusing ..let me try again
>
>
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
>
> I want to refresh a cached data frame based on a condition and the best
> way to do this restart streaming query suggested by Tdas below for similar
> problem
>
> http://mail-archives.apache.org/mod_mbox/spark-user/
> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
> fwmn4jtm1x1nd...@mail.gmail.com%3e
>
> I do understand that checkpoint if helps in recovery and failures but I
> would like to know "how to restart streaming query programmatically without
> stopping my streaming application"
>
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
>
>
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
> wrote:
>
>> See https://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#recovering-from-failures-
>> with-checkpointing
>>
>> Though I think that this currently doesn't work with the console sink.
>>
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
>> wrote:
>>
>>> Hi,
>>>

 I'm trying to restart a streaming query to refresh cached data frame

 Where and how should I restart streaming query

>>>
>>>
>>> val sparkSes = SparkSession
>>>
>>>   .builder
>>>
>>>   .config("spark.master", "local")
>>>
>>>   .appName("StreamingCahcePoc")
>>>
>>>   .getOrCreate()
>>>
>>>
>>>
>>> import sparkSes.implicits._
>>>
>>>
>>>
>>> val dataDF = sparkSes.readStream
>>>
>>>   .schema(streamSchema)
>>>
>>>   .csv("testData")
>>>
>>>
>>>
>>>
>>>
>>>val query = counts.writeStream
>>>
>>>   .outputMode("complete")
>>>
>>>   .format("console")
>>>
>>>   .start()
>>>
>>>
>>> query.awaittermination()
>>>
>>>
>>>



>>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks Michael

I guess my question is little confusing ..let me try again


I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this

I want to refresh a cached data frame based on a condition and the best way
to do this restart streaming query suggested by Tdas below for similar
problem

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

I do understand that checkpoint if helps in recovery and failures but I
would like to know "how to restart streaming query programmatically without
stopping my streaming application"

In place of query.awaittermination should I need to have an logic to
restart query? Please suggest


On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>
>>
>>val query = counts.writeStream
>>
>>   .outputMode("complete")
>>
>>   .format("console")
>>
>>   .start()
>>
>>
>> query.awaittermination()
>>
>>
>>
>>>
>>>
>>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Michael Armbrust
See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Though I think that this currently doesn't work with the console sink.

On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
wrote:

> Hi,
>
>>
>> I'm trying to restart a streaming query to refresh cached data frame
>>
>> Where and how should I restart streaming query
>>
>
>
> val sparkSes = SparkSession
>
>   .builder
>
>   .config("spark.master", "local")
>
>   .appName("StreamingCahcePoc")
>
>   .getOrCreate()
>
>
>
> import sparkSes.implicits._
>
>
>
> val dataDF = sparkSes.readStream
>
>   .schema(streamSchema)
>
>   .csv("testData")
>
>
>
>
>
>val query = counts.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .start()
>
>
> query.awaittermination()
>
>
>
>>
>>
>>


Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


val sparkSes = SparkSession

  .builder

  .config("spark.master", "local")

  .appName("StreamingCahcePoc")

  .getOrCreate()



import sparkSes.implicits._



val dataDF = sparkSes.readStream

  .schema(streamSchema)

  .csv("testData")





   val query = counts.writeStream

  .outputMode("complete")

  .format("console")

  .start()


query.awaittermination()



>
>
>