You can do something like this.

def startQuery(): StreamingQuery = {
   // create your streaming dataframes
   // start the query with the same checkpoint directory}

// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {

   if (activeQuery = null) {     // if query not active, start query
     activeQuery = startQuery()

   } else if (shouldRestartQuery())  {      // check your condition
and restart query
     activeQuery = startQuery()

   activeQuery.awaitTermination(100)   // wait for 100 ms.
   // if there is any error it will throw exception and quit the loop
   // otherwise it will keep checking the condition every 100ms}

On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <>

> Thanks Michael
> I guess my question is little confusing ..let me try again
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
> I want to refresh a cached data frame based on a condition and the best
> way to do this restart streaming query suggested by Tdas below for similar
> problem
> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
> I do understand that checkpoint if helps in recovery and failures but I
> would like to know "how to restart streaming query programmatically without
> stopping my streaming application"
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <>
> wrote:
>> See
>> streaming-programming-guide.html#recovering-from-failures-
>> with-checkpointing
>> Though I think that this currently doesn't work with the console sink.
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <>
>> wrote:
>>> Hi,
>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>> Where and how should I restart streaming query
>>> val sparkSes = SparkSession
>>>       .builder
>>>       .config("spark.master", "local")
>>>       .appName("StreamingCahcePoc")
>>>       .getOrCreate()
>>>     import sparkSes.implicits._
>>>     val dataDF = sparkSes.readStream
>>>       .schema(streamSchema)
>>>       .csv("testData")
>>>        val query = counts.writeStream
>>>       .outputMode("complete")
>>>       .format("console")
>>>       .start()
>>> query.awaittermination()

Reply via email to