Re: How to gracefully shutdown Spark Structured Streaming

2022-02-26 Thread Gourav Sengupta
Dear Mich,

a super duper note of thanks, I had to spend around two weeks to figure
this out :)



Regards,
Gourav Sengupta

On Sat, Feb 26, 2022 at 10:43 AM Mich Talebzadeh 
wrote:

>
>
> On Mon, 26 Apr 2021 at 10:21, Mich Talebzadeh 
> wrote:
>
>>
>> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
>> Event Driven Architecture. In an Event Driven Architecture, there is
>> generally a main loop that listens for events and then triggers a call-back
>> function when one of those events is detected. In a streaming application
>> the application waits to receive the source messages in a set interval or
>> whenever they happen and reacts accordingly.
>>
>> There are occasions that you may want to stop the Spark program
>> gracefully. Gracefully meaning that Spark application handles the last
>> streaming message completely and terminates the application. This is
>> different from invoking interrupt such as CTRL-C. Of course one can
>> terminate the process based on the following
>>
>>
>>1.
>>
>>query.awaitTermination() # Waits for the termination of this query,
>>with stop() or with error
>>2.
>>
>>query.awaitTermination(timeoutMs) # Returns true if this query is
>>terminated within the timeout in milliseconds.
>>
>> So the first one above waits until an interrupt signal is received. The
>> second one will count the timeout and will exit when timeout in
>> milliseconds is reached
>>
>> The issue is that one needs to predict how long the streaming job needs
>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>> may end up the processing terminated without a proper completion of the
>> streaming process.
>>
>> I have devised a method that allows one to terminate the spark
>> application internally after processing the last received message. Within
>> say 2 seconds of the confirmation of shutdown, the process will invoke
>>
>> How to shutdown the topic doing work for the message being processed,
>> wait for it to complete and shutdown the streaming process for a given
>> topic.
>>
>>
>> I thought about this and looked at options. Using sensors to
>> implement this like airflow would be expensive as for example reading a
>> file from object storage or from an underlying database would have incurred
>> additional I/O overheads through continuous polling.
>>
>>
>> So the design had to be incorporated into the streaming process itself.
>> What I came up with was an addition of a control topic (I call it newtopic
>> below), which keeps running triggered every 2 seconds say and is in json
>> format with the following structure
>>
>>
>> root
>>
>>  |-- newtopic_value: struct (nullable = true)
>>
>>  ||-- uuid: string (nullable = true)
>>
>>  ||-- timeissued: timestamp (nullable = true)
>>
>>  ||-- queue: string (nullable = true)
>>
>>  ||-- status: string (nullable = true)
>>
>> In above the queue refers to the business topic) and status is set to
>> 'true', meaning carry on processing the business stream. This control topic
>> streaming  can be restarted anytime, and status can be set to false if we
>> want to stop the streaming queue for a given business topic
>>
>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>
>> 64a8321c-1593-428b-ae65-89e45ddf0640
>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>
>> So how can I stop the business queue when the current business topic
>> message has been processed? Let us say the source is sending data for a
>> business topic every 30 seconds. Our control topic sends a one liner as
>> above every 2 seconds.
>>
>> In your writestream add the following line to be able to identify topic
>> name
>>
>> trigger(processingTime='30 seconds'). \
>> *queryName('md'). *\
>>
>> Next the controlling topic (called newtopic)  has the following
>>
>> foreachBatch(*sendToControl*). \
>> trigger(processingTime='2 seconds'). \
>> queryName('newtopic'). \
>>
>> That method sendToControl does what is needed
>>
>> def sendToControl(dfnewtopic, batchId):
>> if(len(dfnewtopic.take(1))) > 0:
>> #print(f"""newtopic batchId is {batchId}""")
>> #dfnewtopic.show(10,False)
>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>
>> if((queue == 'md')) & (status == 'false')):
>>   spark_session = s.spark_session(config['common']['appName'])
>>   active = spark_session.streams.active
>>   for e in active:
>>  #print(e)
>>  name = e.name
>>  if(name == 'md'):
>> print(f"""Terminating streaming process {name}""")
>> e.stop()
>> else:
>> print("DataFrame newtopic is empty")
>>
>> This seems to work as I checked it to ensure that in 

Re: How to gracefully shutdown Spark Structured Streaming

2022-02-26 Thread Mich Talebzadeh
On Mon, 26 Apr 2021 at 10:21, Mich Talebzadeh 
wrote:

>
> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
> Event Driven Architecture. In an Event Driven Architecture, there is
> generally a main loop that listens for events and then triggers a call-back
> function when one of those events is detected. In a streaming application
> the application waits to receive the source messages in a set interval or
> whenever they happen and reacts accordingly.
>
> There are occasions that you may want to stop the Spark program
> gracefully. Gracefully meaning that Spark application handles the last
> streaming message completely and terminates the application. This is
> different from invoking interrupt such as CTRL-C. Of course one can
> terminate the process based on the following
>
>
>1.
>
>query.awaitTermination() # Waits for the termination of this query,
>with stop() or with error
>2.
>
>query.awaitTermination(timeoutMs) # Returns true if this query is
>terminated within the timeout in milliseconds.
>
> So the first one above waits until an interrupt signal is received. The
> second one will count the timeout and will exit when timeout in
> milliseconds is reached
>
> The issue is that one needs to predict how long the streaming job needs to
> run. Clearly any interrupt at the terminal or OS level (kill process), may
> end up the processing terminated without a proper completion of the
> streaming process.
>
> I have devised a method that allows one to terminate the spark application
> internally after processing the last received message. Within say 2 seconds
> of the confirmation of shutdown, the process will invoke
>
> How to shutdown the topic doing work for the message being processed, wait
> for it to complete and shutdown the streaming process for a given topic.
>
>
> I thought about this and looked at options. Using sensors to
> implement this like airflow would be expensive as for example reading a
> file from object storage or from an underlying database would have incurred
> additional I/O overheads through continuous polling.
>
>
> So the design had to be incorporated into the streaming process itself.
> What I came up with was an addition of a control topic (I call it newtopic
> below), which keeps running triggered every 2 seconds say and is in json
> format with the following structure
>
>
> root
>
>  |-- newtopic_value: struct (nullable = true)
>
>  ||-- uuid: string (nullable = true)
>
>  ||-- timeissued: timestamp (nullable = true)
>
>  ||-- queue: string (nullable = true)
>
>  ||-- status: string (nullable = true)
>
> In above the queue refers to the business topic) and status is set to
> 'true', meaning carry on processing the business stream. This control topic
> streaming  can be restarted anytime, and status can be set to false if we
> want to stop the streaming queue for a given business topic
>
> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>
> 64a8321c-1593-428b-ae65-89e45ddf0640
> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>
> So how can I stop the business queue when the current business topic
> message has been processed? Let us say the source is sending data for a
> business topic every 30 seconds. Our control topic sends a one liner as
> above every 2 seconds.
>
> In your writestream add the following line to be able to identify topic
> name
>
> trigger(processingTime='30 seconds'). \
> *queryName('md'). *\
>
> Next the controlling topic (called newtopic)  has the following
>
> foreachBatch(*sendToControl*). \
> trigger(processingTime='2 seconds'). \
> queryName('newtopic'). \
>
> That method sendToControl does what is needed
>
> def sendToControl(dfnewtopic, batchId):
> if(len(dfnewtopic.take(1))) > 0:
> #print(f"""newtopic batchId is {batchId}""")
> #dfnewtopic.show(10,False)
> queue = dfnewtopic.select(col("queue")).collect()[0][0]
> status = dfnewtopic.select(col("status")).collect()[0][0]
>
> if((queue == 'md')) & (status == 'false')):
>   spark_session = s.spark_session(config['common']['appName'])
>   active = spark_session.streams.active
>   for e in active:
>  #print(e)
>  name = e.name
>  if(name == 'md'):
> print(f"""Terminating streaming process {name}""")
> e.stop()
> else:
> print("DataFrame newtopic is empty")
>
> This seems to work as I checked it to ensure that in this case data was
> written and saved to the target sink (BigQuery table). It will wait until
> data is written completely meaning the current streaming message is
> processed and there is a latency there.
>
> This is the output
>
> Terminating streaming process md
> wrote to DB  ## this is the flag  I added