It's been a few years (so this approach might be out of date) but here's
what I used for PySpark as part of this SO (
https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully/65708677
)

```

# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg !=
"Initializing sources":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)
```


I'd also be interested is there is a newer/better way to do this.. so
please cc me on updates :)


On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> That is a valid question and I am not aware of any new addition to Spark
> Structured Streaming (SSS) in newer releases for this graceful shutdown.
>
> Going back to my earlier explanation, there are occasions that you may
> want to stop the Spark program gracefully. Gracefully meaning that Spark
> application handles the last streaming message completely and terminates
> the application. This is different from invoking interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>
>
>    1.
>
>    query.awaitTermination() # Waits for the termination of this query,
>    with stop() or with error
>    2.
>
>    query.awaitTermination(timeoutMs) # Returns true if this query is
>    terminated within the timeout in milliseconds.
>
> So the first one above waits until an interrupt signal is received. The
> second one will count the timeout and will exit when timeout in
> milliseconds is reached
>
> The issue is that one needs to predict how long the streaming job needs to
> run. Clearly any interrupt at the terminal or OS level (kill process), may
> end up the processing terminated without a proper completion of the
> streaming process.
> So I gather if we agree on what constitutes a graceful shutdown we can
> consider both the tool offerings from Spark itself  or what solutions we
> can come up with.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 May 2021 at 13:28, ayan guha <guha.a...@gmail.com> wrote:
>
>> What are some other "newer" methodologies?
>>
>> Really interested to understand what is possible here as this is a topic
>> came up in this forum time and again.
>>
>> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta <
>> gourav.sengupta.develo...@gmail.com> wrote:
>>
>>> Hi Mich,
>>>
>>> thanks a ton for your kind response, looks like we are still using the
>>> earlier methodologies for stopping a spark streaming program gracefully.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>> I believe I discussed this in this forum. I sent the following to
>>>> spark-dev forum as an add-on to Spark functionality. This is the gist of
>>>> it.
>>>>
>>>>
>>>> Spark Structured Streaming AKA SSS is a very useful tool in dealing
>>>> with Event Driven Architecture. In an Event Driven Architecture, there is
>>>> generally a main loop that listens for events and then triggers a call-back
>>>> function when one of those events is detected. In a streaming application
>>>> the application waits to receive the source messages in a set interval or
>>>> whenever they happen and reacts accordingly.
>>>>
>>>> There are occasions that you may want to stop the Spark program
>>>> gracefully. Gracefully meaning that Spark application handles the last
>>>> streaming message completely and terminates the application. This is
>>>> different from invoking interrupts such as CTRL-C. Of course one can
>>>> terminate the process based on the following
>>>>
>>>>
>>>>    1.
>>>>
>>>>    query.awaitTermination() # Waits for the termination of this query,
>>>>    with stop() or with error
>>>>    2.
>>>>
>>>>    query.awaitTermination(timeoutMs) # Returns true if this query is
>>>>    terminated within the timeout in milliseconds.
>>>>
>>>> So the first one above waits until an interrupt signal is received. The
>>>> second one will count the timeout and will exit when timeout in
>>>> milliseconds is reached
>>>>
>>>> The issue is that one needs to predict how long the streaming job needs
>>>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>>>> may end up the processing terminated without a proper completion of the
>>>> streaming process.
>>>>
>>>> I have devised a method that allows one to terminate the spark
>>>> application internally after processing the last received message. Within
>>>> say 2 seconds of the confirmation of shutdown, the process will invoke
>>>>
>>>> How to shutdown the topic doing work for the message being processed,
>>>> wait for it to complete and shutdown the streaming process for a given
>>>> topic.
>>>>
>>>>
>>>> I thought about this and looked at options. Using sensors to
>>>> implement this like airflow would be expensive as for example reading a
>>>> file from object storage or from an underlying database would have incurred
>>>> additional I/O overheads through continuous polling.
>>>>
>>>>
>>>> So the design had to be incorporated into the streaming process itself.
>>>> What I came up with was an addition of a control topic (I call it newtopic
>>>> below), which keeps running triggered every 2 seconds say and is in json
>>>> format with the following structure
>>>>
>>>>
>>>> root
>>>>
>>>>  |-- newtopic_value: struct (nullable = true)
>>>>
>>>>  |    |-- uuid: string (nullable = true)
>>>>
>>>>  |    |-- timeissued: timestamp (nullable = true)
>>>>
>>>>  |    |-- queue: string (nullable = true)
>>>>
>>>>  |    |-- status: string (nullable = true)
>>>>
>>>> In above the queue refers to the business topic) and status is set to
>>>> 'true', meaning carry on processing the business stream. This control topic
>>>> streaming  can be restarted anytime, and status can be set to false if we
>>>> want to stop the streaming queue for a given business topic
>>>>
>>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>>>
>>>> 64a8321c-1593-428b-ae65-89e45ddf0640
>>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>>>
>>>> So how can I stop the business queue when the current business topic
>>>> message has been processed? Let us say the source is sending data for a
>>>> business topic every 30 seconds. Our control topic sends a one liner as
>>>> above every 2 seconds.
>>>>
>>>> In your writestream add the following line to be able to identify topic
>>>> name
>>>>
>>>> trigger(processingTime='30 seconds'). \
>>>> *queryName('md'). *\
>>>>
>>>> Next the controlling topic (called newtopic)  has the following
>>>>
>>>> foreachBatch(*sendToControl*). \
>>>> trigger(processingTime='2 seconds'). \
>>>> queryName('newtopic'). \
>>>>
>>>> That method sendToControl does what is needed
>>>>
>>>> def sendToControl(dfnewtopic, batchId):
>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>         #print(f"""newtopic batchId is {batchId}""")
>>>>         #dfnewtopic.show(10,False)
>>>>         queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>>>         status = dfnewtopic.select(col("status")).collect()[0][0]
>>>>
>>>>         if((queue == 'md')) & (status == 'false')):
>>>>           spark_session = s.spark_session(config['common']['appName'])
>>>>           active = spark_session.streams.active
>>>>           for e in active:
>>>>              #print(e)
>>>>              name = e.name
>>>>              if(name == 'md'):
>>>>                 print(f"""Terminating streaming process {name}""")
>>>>                 e.stop()
>>>>     else:
>>>>         print("DataFrame newtopic is empty")
>>>>
>>>> This seems to work as I checked it to ensure that in this case data was
>>>> written and saved to the target sink (BigQuery table). It will wait until
>>>> data is written completely meaning the current streaming message is
>>>> processed and there is a latency there (meaning waiting for graceful
>>>> completion)
>>>>
>>>> This is the output
>>>>
>>>> Terminating streaming process md
>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>> micro-bath was completed
>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>
>>>> The various termination processes are described in
>>>>
>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>> (apache.org)
>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>
>>>> This is the idea I came up with which allows ending the streaming
>>>> process with least cost.
>>>>
>>>> HTH
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> just thought of reaching out once again and seeking out your kind help
>>>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>>>> still use the methods of creating a file as in SPARK 2.4.x which is 
>>>>> several
>>>>> years old method or do we have a better approach in SPARK 3.1?
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com>
>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>> To: <user@spark.apache.org>
>>>>>
>>>>>
>>>>> Dear friends,
>>>>>
>>>>> is there any documentation available for gracefully stopping SPARK
>>>>> Structured Streaming in 3.1.x?
>>>>>
>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>> wondering whether there is a better way available today to gracefully
>>>>> shutdown a SPARK streaming job.
>>>>>
>>>>> Thanks a ton in advance for all your kind help.
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>> --
>> Best Regards,
>> Ayan Guha
>>
>

Reply via email to