Thanks Michael

I guess my question is little confusing ..let me try again


I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this

I want to refresh a cached data frame based on a condition and the best way
to do this restart streaming query suggested by Tdas below for similar
problem

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

I do understand that checkpoint if helps in recovery and failures but I
would like to know "how to restart streaming query programmatically without
stopping my streaming application"

In place of query.awaittermination should I need to have an logic to
restart query? Please suggest


On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>       .builder
>>
>>       .config("spark.master", "local")
>>
>>       .appName("StreamingCahcePoc")
>>
>>       .getOrCreate()
>>
>>
>>
>>     import sparkSes.implicits._
>>
>>
>>
>>     val dataDF = sparkSes.readStream
>>
>>       .schema(streamSchema)
>>
>>       .csv("testData")
>>
>>
>>
>>
>>
>>        val query = counts.writeStream
>>
>>       .outputMode("complete")
>>
>>       .format("console")
>>
>>       .start()
>>
>>
>> query.awaittermination()
>>
>>
>>
>>>
>>>
>>>
>

Reply via email to