Resending this feature request and proposing a possible solution

Can some advise if I need to complete
Spark project improvement proposal
<https://spark.apache.org/improvement-proposals.html>


Thanks


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




---------- Forwarded message ---------
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Fri, 23 Apr 2021 at 10:36
Subject: Shutting down spark structured streaming when the streaming
process completed current process
To: user @spark <u...@spark.apache.org>



Hi,


This is the design that I came up with.


How to shutdown the topic doing work for the message being processed, wait
for it to complete and shutdown the streaming process for a given topic.


I thought about this and looked at options. Using sensors to implement this
like airflow would be expensive as for example reading a file from object
storage or from an underlying database would have incurred additional I/O
overheads through continuous polling.


So the design had to be incorporated into the streaming process itself.
What I came up with was an addition of a control topic which keeps running
triggered every 2 seconds say and is in json format with the following
structure


root

 |-- newtopic_value: struct (nullable = true)

 |    |-- uuid: string (nullable = true)

 |    |-- timeissued: timestamp (nullable = true)

 |    |-- queue: string (nullable = true)

 |    |-- status: string (nullable = true)

In above the queue refers to the business topic) and status is set to
'true', meaning BAU. This control topic streaming  can be
restarted anytime, and status can be set to false if we want to stop the
streaming queue for a given business topic

ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
{"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
"timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}

64a8321c-1593-428b-ae65-89e45ddf0640
{"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
"timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}

So how can I stop the business queue when the current business topic
message has been processed? Let us say the source is sending data for a
business topic every 30 seconds. Our control topic sends a one liner as
above every 2 seconds.

In your writestream add the following line to be able to identify topic name

trigger(processingTime='30 seconds'). \
*queryName('md'). *\

Next the controlling topic (called newtopic)  has the following

foreachBatch(*sendToControl*). \
trigger(processingTime='2 seconds'). \
queryName('newtopic'). \

That method sendToControl does what is needed

def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        #print(f"""newtopic batchId is {batchId}""")
        #dfnewtopic.show(10,False)
        queue = dfnewtopic.select(col("queue")).collect()[0][0]
        status = dfnewtopic.select(col("status")).collect()[0][0]

        if((queue == 'md')) & (status == 'false')):
          spark_session = s.spark_session(config['common']['appName'])
          active = spark_session.streams.active
          for e in active:
             #print(e)
             name = e.name
             if(name == 'md'):
                print(f"""Terminating streaming process {name}""")
                e.stop()
    else:
        print("DataFrame newtopic is empty")

This seems to work as I checked it to ensure that in this case data was
written and saved to the target sink (BigQuery table). It will wait until
data is written completely meaning the current streaming message is
processed and there is a latency there.

This is the output

Terminating streaming process md
wrote to DB  ## this is the flag  I added to ensure the current micro-bath
was completed
2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id =
6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error

The various termination processes are described in

Structured Streaming Programming Guide - Spark 3.1.1 Documentation
(apache.org)
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>

This is the idea I came up with which allows ending the streaming process
with least cost.

Ideas, opinions are welcome


Cheers


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to