You can do something like this.
def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if query not active, start query activeQuery = startQuery() } else if (shouldRestartQuery()) { // check your condition and restart query activeQuery.stop() activeQuery = startQuery() } activeQuery.awaitTermination(100) // wait for 100 ms. // if there is any error it will throw exception and quit the loop // otherwise it will keep checking the condition every 100ms} On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> wrote: > Thanks Michael > > I guess my question is little confusing ..let me try again > > > I would like to restart streaming query programmatically while my > streaming application is running based on a condition and why I want to do > this > > I want to refresh a cached data frame based on a condition and the best > way to do this restart streaming query suggested by Tdas below for similar > problem > > http://mail-archives.apache.org/mod_mbox/spark-user/ > 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ > fwmn4jtm1x1nd...@mail.gmail.com%3e > > I do understand that checkpoint if helps in recovery and failures but I > would like to know "how to restart streaming query programmatically without > stopping my streaming application" > > In place of query.awaittermination should I need to have an logic to > restart query? Please suggest > > > On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> > wrote: > >> See https://spark.apache.org/docs/latest/structured- >> streaming-programming-guide.html#recovering-from-failures- >> with-checkpointing >> >> Though I think that this currently doesn't work with the console sink. >> >> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Hi, >>> >>>> >>>> I'm trying to restart a streaming query to refresh cached data frame >>>> >>>> Where and how should I restart streaming query >>>> >>> >>> >>> val sparkSes = SparkSession >>> >>> .builder >>> >>> .config("spark.master", "local") >>> >>> .appName("StreamingCahcePoc") >>> >>> .getOrCreate() >>> >>> >>> >>> import sparkSes.implicits._ >>> >>> >>> >>> val dataDF = sparkSes.readStream >>> >>> .schema(streamSchema) >>> >>> .csv("testData") >>> >>> >>> >>> >>> >>> val query = counts.writeStream >>> >>> .outputMode("complete") >>> >>> .format("console") >>> >>> .start() >>> >>> >>> query.awaittermination() >>> >>> >>> >>>> >>>> >>>> >>