[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-27 Thread Mich Talebzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693883#comment-17693883
 ] 

Mich Talebzadeh commented on SPARK-42485:
-

Yes that is the intention. I have not had a chance to complete the 
documentation yet. 

 

[[SPIP] Shutting down spark structured streaming when the streaming process 
completed current process - Google 
Docs|https://docs.google.com/document/d/1SljobKKHiB2M7Md7raBOMM7o2EW6nglH-hEM1dtjUQg/edit#heading=h.ud7930xhlsm6]

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.2.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-26 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693745#comment-17693745
 ] 

Hyukjin Kwon commented on SPARK-42485:
--

For the proper SPIP, should better read 
https://spark.apache.org/improvement-proposals.html, and answer these questions 
posted there.
>From a cursory look, I think this won't need an SPIP though.

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.2.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-21 Thread Mich Talebzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691790#comment-17691790
 ] 

Mich Talebzadeh commented on SPARK-42485:
-

Hi Boyang,

 

Please find my responses below

 
 # Can you share any use cases you have that will benefit from this feature
 # --> Sure i will add these to SPIP document in due course
 # Is there a SPIP doc written for this yet? If there is a SPIP doc written 
please link in the JIRA.
 # --> I have created an outline but not there yet.  [[SPIP] Shutting down 
spark structured streaming when the streaming process completed current process 
- Google 
Docs|https://docs.google.com/document/d/1SljobKKHiB2M7Md7raBOMM7o2EW6nglH-hEM1dtjUQg/edit#heading=h.ud7930xhlsm6]
 # In regards to this statement > I have devised a method that allows one to 
terminate the spark application internally after processing the last received 
message. Within say 2 seconds of the confirmation of shutdown, the process will 
invoke a graceful shutdown.

Do you mean the query will gracefully shutdown after the most current/most 
recent micro-batch is done processing?  

--> Just to qualify shutdown gracefully when the last message is processed 
successfully 

This is the original case that I posted in 24 April 2021 to the user group

"""

{color:#00}How to shutdown the topic doing work for the message being 
processed, wait for it to complete and shutdown the streaming process for a 
given topic.{color}

{color:#00} {color}

{color:#00}I thought about this and looked at options. Using sensors to 
implement this like airflow would be expensive as for example reading a file 
from object storage or from an underlying database would have incurred 
additional I/O overheads through continuous polling.{color}

{color:#00} {color}

{color:#00}So the design had to be incorporated into the streaming process 
itself. What I came up with was an addition of a control topic (I call it 
newtopic below), which keeps running triggered every 2 seconds say and is in 
json format with the following structure{color}

{color:#00} {color}

{color:#00}root{color}

{color:#00} |-- newtopic_value: struct (nullable = true){color}

{color:#00} |    |-- uuid: string (nullable = true){color}

{color:#00} |    |-- timeissued: timestamp (nullable = true){color}

{color:#00} |    |-- queue: string (nullable = true){color}

{color:#00} |    |-- status: string (nullable = true){color}
 
In above the queue refers to the business topic) and status is set to 'true', 
meaning carry on processing the business stream. This control topic streaming  
can be restarted anytime, and status can be set to false if we want to stop the 
streaming queue for a given business topic
 
ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe    
{"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe", 
"timeissued":"2021-04-23T08:54:06", {color:#ff}"queue":"md", 
"status":"true"{color}}
 
64a8321c-1593-428b-ae65-89e45ddf0640    
{"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640", 
"timeissued":"2021-04-23T09:49:37", {color:#ff}"queue":"md", 
{color}{color:#ff}"status":"false"}{color}
 
So how can I stop the business queue when the current business topic message 
has been processed? Let us say the source is sending data for a business topic 
every 30 seconds. Our control topic sends a one liner as above every 2 seconds. 
 
In your writestream add the following line to be able to identify topic name
 
{color:#ff}trigger(processingTime='30 seconds'). \{color}
{color:#ff}*queryName('md').* \{color}
 
Next the controlling topic (called newtopic)  has the following
 
foreachBatch({*}sendToControl{*}). \

trigger(processingTime='2 seconds'). \
queryName('newtopic'). \
 
That method sendToControl does what is needed
 
def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        #print(f"""newtopic batchId is \{batchId}""")
        #dfnewtopic.show(10,False)
        queue = dfnewtopic.select(col("queue")).collect()[0][0]
        status = dfnewtopic.select(col("status")).collect()[0][0]
 
        if((queue == 'md')) & (status == 'false')):
          spark_session = s.spark_session(config['common']['appName'])
          active = spark_session.streams.active
          for e in active:
             #print(e)
             name = [e.name|http://e.name/]
             if(name == 'md'):
                print(f"""Terminating streaming process \{name}""")
                e.stop()
    else:
        print("DataFrame newtopic is empty")
 
This seems to work as I checked it to ensure that in this case data was written 
and saved to the target sink (BigQuery table). It will wait until data is 
written completely meaning the current streaming message is processed and there 
is a latency there.
 
This is the output
 
Terminating streaming process md

wrote to DB  ## this is the flag  I added to ensure the current 

[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-21 Thread Boyang Jerry Peng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691762#comment-17691762
 ] 

Boyang Jerry Peng commented on SPARK-42485:
---

[~mich.talebza...@gmail.com] This seems like an interesting and useful feature. 
 I have a couple of questions:

 
 # Can you share any use cases you have that will benefit from this feature
 # Is there a SPIP doc written for this yet? If there is a SPIP doc written 
please link in the JIRA.
 # In regards to this statement > I have devised a method that allows one to 
terminate the spark application internally after processing the last received 
message. Within say 2 seconds of the confirmation of shutdown, the process will 
invoke a graceful shutdown.

Do you mean the query will gracefully shutdown after the most current/most 
recent micro-batch is done processing?  

 

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.2.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-19 Thread Mich Talebzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690947#comment-17690947
 ] 

Mich Talebzadeh commented on SPARK-42485:
-

done thanks

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.2.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-19 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690944#comment-17690944
 ] 

Dongjoon Hyun commented on SPARK-42485:
---

Thank you for the removal of Target Version. The remaining stuff among my 
comments is to change `Affect Versions` to 3.5.0 from 3.2.2.

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-19 Thread Mich Talebzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690847#comment-17690847
 ] 

Mich Talebzadeh commented on SPARK-42485:
-

How about Target Version?

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42485) SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-18 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690828#comment-17690828
 ] 

Dongjoon Hyun commented on SPARK-42485:
---

[~mich.talebza...@gmail.com] Here is a few advice for you.

1. I removed me from Shaphard field. Apache Spark community is a voluntarily 
2. You should remove `Target Version` according to the community policy.
  - https://spark.apache.org/contributing.html
{quote}Do not set the following fields:
- Fix Version. This is assigned by committers only when resolved.
- Target Version. This is assigned by committers to indicate a PR has been 
accepted for possible fix by the target version.{quote}

3. `New Feature` should follow the version of master branch (3.5.0) as of today 
because it cannot affect the release branches (branch-3.4/3.3/..) because 
Apache Spark community policy doesn't allow a feature or improvement 
backporting.

> SPIP: Shutting down spark structured streaming when the streaming process 
> completed current process
> ---
>
> Key: SPARK-42485
> URL: https://issues.apache.org/jira/browse/SPARK-42485
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Mich Talebzadeh
>Priority: Major
>  Labels: SPIP
>
> Spark Structured Streaming is a very useful tool in dealing with Event Driven 
> Architecture. In an Event Driven Architecture, there is generally a main loop 
> that listens for events and then triggers a call-back function when one of 
> those events is detected. In a streaming application the application waits to 
> receive the source messages in a set interval or whenever they happen and 
> reacts accordingly.
> There are occasions that you may want to stop the Spark program gracefully. 
> Gracefully meaning that Spark application handles the last streaming message 
> completely and terminates the application. This is different from invoking 
> interrupts such as CTRL-C.
> Of course one can terminate the process based on the following
>  # query.awaitTermination() # Waits for the termination of this query, with 
> stop() or with error
>  # query.awaitTermination(timeoutMs) # Returns true if this query is 
> terminated within the timeout in milliseconds.
> So the first one above waits until an interrupt signal is received. The 
> second one will count the timeout and will exit when timeout in milliseconds 
> is reached.
> The issue is that one needs to predict how long the streaming job needs to 
> run. Clearly any interrupt at the terminal or OS level (kill process), may 
> end up the processing terminated without a proper completion of the streaming 
> process.
> I have devised a method that allows one to terminate the spark application 
> internally after processing the last received message. Within say 2 seconds 
> of the confirmation of shutdown, the process will invoke a graceful shutdown.
> {color:#00}This new feature proposes a solution to handle the topic doing 
> work for the message being processed gracefully, wait for it to complete and 
> shutdown the streaming process for a given topic without loss of data or 
> orphaned transactions{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org