You can do something like this.

def startQuery(): StreamingQuery = {
   // create your streaming dataframes
   // start the query with the same checkpoint directory}

// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {

   if (activeQuery = null) {     // if query not active, start query
     activeQuery = startQuery()

   } else if (shouldRestartQuery())  {      // check your condition
and restart query
     activeQuery.stop()
     activeQuery = startQuery()
   }

   activeQuery.awaitTermination(100)   // wait for 100 ms.
   // if there is any error it will throw exception and quit the loop
   // otherwise it will keep checking the condition every 100ms}




On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
wrote:

> Thanks Michael
>
> I guess my question is little confusing ..let me try again
>
>
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
>
> I want to refresh a cached data frame based on a condition and the best
> way to do this restart streaming query suggested by Tdas below for similar
> problem
>
> http://mail-archives.apache.org/mod_mbox/spark-user/
> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
> fwmn4jtm1x1nd...@mail.gmail.com%3e
>
> I do understand that checkpoint if helps in recovery and failures but I
> would like to know "how to restart streaming query programmatically without
> stopping my streaming application"
>
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
>
>
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> See https://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#recovering-from-failures-
>> with-checkpointing
>>
>> Though I think that this currently doesn't work with the console sink.
>>
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>>>
>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>
>>>> Where and how should I restart streaming query
>>>>
>>>
>>>
>>> val sparkSes = SparkSession
>>>
>>>       .builder
>>>
>>>       .config("spark.master", "local")
>>>
>>>       .appName("StreamingCahcePoc")
>>>
>>>       .getOrCreate()
>>>
>>>
>>>
>>>     import sparkSes.implicits._
>>>
>>>
>>>
>>>     val dataDF = sparkSes.readStream
>>>
>>>       .schema(streamSchema)
>>>
>>>       .csv("testData")
>>>
>>>
>>>
>>>
>>>
>>>        val query = counts.writeStream
>>>
>>>       .outputMode("complete")
>>>
>>>       .format("console")
>>>
>>>       .start()
>>>
>>>
>>> query.awaittermination()
>>>
>>>
>>>
>>>>
>>>>
>>>>
>>

Reply via email to