It's been a few years (so this approach might be out of date) but here's what I used for PySpark as part of this SO ( https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully/65708677 )
``` # Helper method to stop a streaming query def stop_stream_query(query, wait_time): """Stop a running streaming query""" while query.isActive: msg = query.status['message'] data_avail = query.status['isDataAvailable'] trigger_active = query.status['isTriggerActive'] if not data_avail and not trigger_active and msg != "Initializing sources": print('Stopping query...') query.stop() time.sleep(0.5) # Okay wait for the stop to happen print('Awaiting termination...') query.awaitTermination(wait_time) ``` I'd also be interested is there is a newer/better way to do this.. so please cc me on updates :) On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > That is a valid question and I am not aware of any new addition to Spark > Structured Streaming (SSS) in newer releases for this graceful shutdown. > > Going back to my earlier explanation, there are occasions that you may > want to stop the Spark program gracefully. Gracefully meaning that Spark > application handles the last streaming message completely and terminates > the application. This is different from invoking interrupts such as CTRL-C. > Of course one can terminate the process based on the following > > > 1. > > query.awaitTermination() # Waits for the termination of this query, > with stop() or with error > 2. > > query.awaitTermination(timeoutMs) # Returns true if this query is > terminated within the timeout in milliseconds. > > So the first one above waits until an interrupt signal is received. The > second one will count the timeout and will exit when timeout in > milliseconds is reached > > The issue is that one needs to predict how long the streaming job needs to > run. Clearly any interrupt at the terminal or OS level (kill process), may > end up the processing terminated without a proper completion of the > streaming process. > So I gather if we agree on what constitutes a graceful shutdown we can > consider both the tool offerings from Spark itself or what solutions we > can come up with. > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 6 May 2021 at 13:28, ayan guha <guha.a...@gmail.com> wrote: > >> What are some other "newer" methodologies? >> >> Really interested to understand what is possible here as this is a topic >> came up in this forum time and again. >> >> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta < >> gourav.sengupta.develo...@gmail.com> wrote: >> >>> Hi Mich, >>> >>> thanks a ton for your kind response, looks like we are still using the >>> earlier methodologies for stopping a spark streaming program gracefully. >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> >>>> Hi, >>>> >>>> >>>> I believe I discussed this in this forum. I sent the following to >>>> spark-dev forum as an add-on to Spark functionality. This is the gist of >>>> it. >>>> >>>> >>>> Spark Structured Streaming AKA SSS is a very useful tool in dealing >>>> with Event Driven Architecture. In an Event Driven Architecture, there is >>>> generally a main loop that listens for events and then triggers a call-back >>>> function when one of those events is detected. In a streaming application >>>> the application waits to receive the source messages in a set interval or >>>> whenever they happen and reacts accordingly. >>>> >>>> There are occasions that you may want to stop the Spark program >>>> gracefully. Gracefully meaning that Spark application handles the last >>>> streaming message completely and terminates the application. This is >>>> different from invoking interrupts such as CTRL-C. Of course one can >>>> terminate the process based on the following >>>> >>>> >>>> 1. >>>> >>>> query.awaitTermination() # Waits for the termination of this query, >>>> with stop() or with error >>>> 2. >>>> >>>> query.awaitTermination(timeoutMs) # Returns true if this query is >>>> terminated within the timeout in milliseconds. >>>> >>>> So the first one above waits until an interrupt signal is received. The >>>> second one will count the timeout and will exit when timeout in >>>> milliseconds is reached >>>> >>>> The issue is that one needs to predict how long the streaming job needs >>>> to run. Clearly any interrupt at the terminal or OS level (kill process), >>>> may end up the processing terminated without a proper completion of the >>>> streaming process. >>>> >>>> I have devised a method that allows one to terminate the spark >>>> application internally after processing the last received message. Within >>>> say 2 seconds of the confirmation of shutdown, the process will invoke >>>> >>>> How to shutdown the topic doing work for the message being processed, >>>> wait for it to complete and shutdown the streaming process for a given >>>> topic. >>>> >>>> >>>> I thought about this and looked at options. Using sensors to >>>> implement this like airflow would be expensive as for example reading a >>>> file from object storage or from an underlying database would have incurred >>>> additional I/O overheads through continuous polling. >>>> >>>> >>>> So the design had to be incorporated into the streaming process itself. >>>> What I came up with was an addition of a control topic (I call it newtopic >>>> below), which keeps running triggered every 2 seconds say and is in json >>>> format with the following structure >>>> >>>> >>>> root >>>> >>>> |-- newtopic_value: struct (nullable = true) >>>> >>>> | |-- uuid: string (nullable = true) >>>> >>>> | |-- timeissued: timestamp (nullable = true) >>>> >>>> | |-- queue: string (nullable = true) >>>> >>>> | |-- status: string (nullable = true) >>>> >>>> In above the queue refers to the business topic) and status is set to >>>> 'true', meaning carry on processing the business stream. This control topic >>>> streaming can be restarted anytime, and status can be set to false if we >>>> want to stop the streaming queue for a given business topic >>>> >>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe >>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe", >>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"} >>>> >>>> 64a8321c-1593-428b-ae65-89e45ddf0640 >>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640", >>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"} >>>> >>>> So how can I stop the business queue when the current business topic >>>> message has been processed? Let us say the source is sending data for a >>>> business topic every 30 seconds. Our control topic sends a one liner as >>>> above every 2 seconds. >>>> >>>> In your writestream add the following line to be able to identify topic >>>> name >>>> >>>> trigger(processingTime='30 seconds'). \ >>>> *queryName('md'). *\ >>>> >>>> Next the controlling topic (called newtopic) has the following >>>> >>>> foreachBatch(*sendToControl*). \ >>>> trigger(processingTime='2 seconds'). \ >>>> queryName('newtopic'). \ >>>> >>>> That method sendToControl does what is needed >>>> >>>> def sendToControl(dfnewtopic, batchId): >>>> if(len(dfnewtopic.take(1))) > 0: >>>> #print(f"""newtopic batchId is {batchId}""") >>>> #dfnewtopic.show(10,False) >>>> queue = dfnewtopic.select(col("queue")).collect()[0][0] >>>> status = dfnewtopic.select(col("status")).collect()[0][0] >>>> >>>> if((queue == 'md')) & (status == 'false')): >>>> spark_session = s.spark_session(config['common']['appName']) >>>> active = spark_session.streams.active >>>> for e in active: >>>> #print(e) >>>> name = e.name >>>> if(name == 'md'): >>>> print(f"""Terminating streaming process {name}""") >>>> e.stop() >>>> else: >>>> print("DataFrame newtopic is empty") >>>> >>>> This seems to work as I checked it to ensure that in this case data was >>>> written and saved to the target sink (BigQuery table). It will wait until >>>> data is written completely meaning the current streaming message is >>>> processed and there is a latency there (meaning waiting for graceful >>>> completion) >>>> >>>> This is the output >>>> >>>> Terminating streaming process md >>>> wrote to DB ## this is the flag I added to ensure the current >>>> micro-bath was completed >>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md >>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId = >>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error >>>> >>>> The various termination processes are described in >>>> >>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation >>>> (apache.org) >>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries> >>>> >>>> This is the idea I came up with which allows ending the streaming >>>> process with least cost. >>>> >>>> HTH >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta < >>>> gourav.sengupta.develo...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> just thought of reaching out once again and seeking out your kind help >>>>> to find out what is the best way to stop SPARK streaming gracefully. Do we >>>>> still use the methods of creating a file as in SPARK 2.4.x which is >>>>> several >>>>> years old method or do we have a better approach in SPARK 3.1? >>>>> >>>>> Regards, >>>>> Gourav Sengupta >>>>> >>>>> ---------- Forwarded message --------- >>>>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com> >>>>> Date: Wed, Apr 21, 2021 at 10:06 AM >>>>> Subject: Graceful shutdown SPARK Structured Streaming >>>>> To: <user@spark.apache.org> >>>>> >>>>> >>>>> Dear friends, >>>>> >>>>> is there any documentation available for gracefully stopping SPARK >>>>> Structured Streaming in 3.1.x? >>>>> >>>>> I am referring to articles which are 4 to 5 years old and was >>>>> wondering whether there is a better way available today to gracefully >>>>> shutdown a SPARK streaming job. >>>>> >>>>> Thanks a ton in advance for all your kind help. >>>>> >>>>> Regards, >>>>> Gourav Sengupta >>>>> >>>> -- >> Best Regards, >> Ayan Guha >> >