OK I found a workaround.

Basically each stream state is not kept and I have two streams. One is a
business topic and the other one created to shut down spark structured
streaming gracefully.

I was interested to print the value for the most recent batch Id for the
business topic called "md" here ust before terminating it gracefully. I
made some effort to use a single function to get batchId for both topics
but this did not work.

However under the checkpoint directory in spark structured streaming,
<https://docs.databricks.com/structured-streaming/async-checkpointing.html>
there is a subdirectory called offsets that is maintained by Spark. This
directory contains a list of batchIds for recovery/start from where you
left purpose.

So I read from that directory visible to spark and the last value for the
business batchId..

def sendToControl(dfnewtopic, batchId2):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""From sendToControl, newtopic batchId is {batchId2}""")
        dfnewtopic.show(100,False)
        queue = dfnewtopic.first()[2]
        status = dfnewtopic.first()[3]
        print(f"""testing queue is {queue}, and status is {status}""")
        if((queue == config['MDVariables']['topic']) & (status == 'false')):
## shutdown issued
          # get the last batchId for the main topic from
{checkpoint_path}/offsets sub-directory
          dir_path = "///ssd/hduser/MDBatchBQ/chkpt/offsets"
          dir_list = os.listdir(dir_path)
          batchIdMD = max(dir_list)
          spark_session = s.spark_session(config['common']['appName'])
          active = spark_session.streams.active
          for e in active:
             name = e.name
             if(name == config['MDVariables']['topic']):
                print(f"""\n==> Request terminating streaming process for
topic {name} with batchId = {batchIdMD} at {datetime.now()}\n """)
                e.stop()
    else:
        print("DataFrame newtopic is empty")



>From sendToControl, newtopic batchId is 191
>From sendToSink, md, batchId is 676, at 2023-03-05 19:57:00.095001
+------------------------------------+-------------------+-----+------+
|uuid                                |timeissued         |queue|status|
+------------------------------------+-------------------+-----+------+
|f26544ab-95f5-47f3-ad8b-03beb25239c2|2023-03-05 19:56:49|md   |true  |
+------------------------------------+-------------------+-----+------+

testing queue is md, and status is true
>From sendToSink, md, batchId is 677, at 2023-03-05 19:57:30.072476
>From sendToSink, md, batchId is 678, at 2023-03-05 19:58:00.081617
>From sendToControl, newtopic batchId is 192
+------------------------------------+-------------------+-----+------+
|uuid                                |timeissued         |queue|status|
+------------------------------------+-------------------+-----+------+
|92f0b6fc-d683-42de-af17-f8a021048196|2023-03-05 19:57:29|md   |false |
+------------------------------------+-------------------+-----+------+


*==> Request terminating streaming process for topic md with batchId = 678
at 2023-03-05 19:58:01.589334*

2023-03-05 19:58:01,649 ERROR streaming.MicroBatchExecution: Query newtopic
[id = 19f4c6ad-11b8-451f-acf1-8bfbea7c370b, runId =
dd26db7d-f4bf-4176-ae75-116eb67eb237] terminated with error


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 4 Mar 2023 at 22:13, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> This might help
>
> https://docs.databricks.com/structured-streaming/foreach.html
>
> streamingDF.writeStream.foreachBatch(...) allows you to specify a
> function that is executed on the output data of every micro-batch of the
> streaming query. It takes two parameters: a DataFrame or Dataset that has
> the output data of a micro-batch and the unique ID of the micro-batch
>
>
> So there are two different function calls in my case. I cannot put them
> together in one function.
>
>
>            newtopicResult = streamingNewtopic.select( \
>
>                      col("newtopic_value.uuid").alias("uuid") \
>
>                    , col("newtopic_value.timeissued").alias("timeissued") \
>
>                    , col("newtopic_value.queue").alias("queue") \
>
>                    , col("newtopic_value.status").alias("status")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>  *                    foreachBatch(sendToControl). \*
>
> *                     trigger(processingTime='30 seconds'). \*
>
> *                     option('checkpointLocation',
> checkpoint_path_newtopic). \*
>
> *                     queryName(config['MDVariables']['newtopic']). \*
>
> *                     start()*
>
>             #print(newtopicResult)
>
>
>             result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>   *                   foreachBatch(sendToSink). \*
>
> *                     trigger(processingTime='30 seconds'). \*
>
> *                     option('checkpointLocation', checkpoint_path). \*
>
> *                     queryName(config['MDVariables']['topic']). \*
>
>                      start()
>
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 4 Mar 2023 at 21:51, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> I am aware of your point that global  don't work in a distributed
>> environment.
>> With regard to your other point, these are two different topics with
>> their own streams. The point of second stream is to set the status to
>> false, so it can gracefully shutdown the main stream (the one called "md")
>> here
>>
>> For example, the second stream has this row
>>
>>
>> +------------------------------------+-------------------+-----+------+
>>
>> |uuid                                |timeissued         |queue|status|
>>
>> +------------------------------------+-------------------+-----+------+
>>
>> |ac74d419-58aa-4879-945d-a2a41bb64873|2023-03-04 21:29:18|md   |true  |
>>
>> +------------------------------------+-------------------+-----+------+
>>
>> so every 30 seconds, it checks the status and if staus = false, it shuts
>> down the main stream gracefully. It works ok
>>
>> def sendToControl(dfnewtopic, batchId2):
>>     if(len(dfnewtopic.take(1))) > 0:
>>         print(f"""From sendToControl, newtopic batchId is {batchId2}""")
>>         dfnewtopic.show(100,False)
>>         queue = dfnewtopic.first()[2]
>>         status = dfnewtopic.first()[3]
>>         print(f"""testing queue is {queue}, and status is {status}""")
>>         if((queue == config['MDVariables']['topic']) & (status ==
>> 'false')):
>>           spark_session = s.spark_session(config['common']['appName'])
>>           active = spark_session.streams.active
>>           for e in active:
>>              name = e.name
>>              if(name == config['MDVariables']['topic']):
>>                 print(f"""\n==> Request terminating streaming process for
>> topic {name} at {datetime.now()}\n """)
>>                 e.stop()
>>     else:
>>         print("DataFrame newtopic is empty")
>>
>> and so when status set to false in the second it does as below
>>
>> From sendToControl, newtopic batchId is 93
>> +------------------------------------+-------------------+-----+------+
>> |uuid                                |timeissued         |queue|status|
>> +------------------------------------+-------------------+-----+------+
>> |c4736bc7-bee7-4dce-b67a-3b1d674b243a|2023-03-04 21:36:52|md   |false |
>> +------------------------------------+-------------------+-----+------+
>>
>> *testing queue is md, and status is false*
>>
>> ==> Request terminating streaming process for topic md at 2023-03-04
>> 21:36:55.590162
>>
>> and shuts down
>>
>> I want to state this
>>
>>   print(f"""\n==> Request terminating streaming process for topic {name}
>> and batch {BatchId for md} at {datetime.now()}\n """)
>>
>> That {BatchId for md} should come from this one
>>
>> def sendToSink(df, batchId):
>>     if(len(df.take(1))) > 0:
>>         print(f"""From sendToSink, md, batchId is {batchId}, at
>> {datetime.now()} """)
>>         #df.show(100,False)
>>         df. persist()
>>         # write to BigQuery batch table
>>         #s.writeTableToBQ(df, "append",
>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>         df.unpersist()
>>         #print(f"""wrote to DB""")
>>         batchidMD = batchId
>>         print(batchidMD)
>>     else:
>>         print("DataFrame md is empty")
>>
>> I trust I explained it adequately
>>
>> cheers
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 4 Mar 2023 at 21:22, Sean Owen <sro...@gmail.com> wrote:
>>
>>> I don't quite get it - aren't you applying to the same stream, and
>>> batches? worst case why not apply these as one function?
>>> Otherwise, how do you mean to associate one call to another?
>>> globals don't help here. They aren't global beyond the driver, and,
>>> which one would be which batch?
>>>
>>> On Sat, Mar 4, 2023 at 3:02 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thanks. they are different batchIds
>>>>
>>>> From sendToControl, newtopic batchId is 76
>>>> From sendToSink, md, batchId is 563
>>>>
>>>> As a matter of interest, why does a global variable not work?
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 4 Mar 2023 at 20:13, Sean Owen <sro...@gmail.com> wrote:
>>>>
>>>>> It's the same batch ID already, no?
>>>>> Or why not simply put the logic of both in one function? or write one
>>>>> function that calls both?
>>>>>
>>>>> On Sat, Mar 4, 2023 at 2:07 PM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> This is probably pretty  straight forward but somehow is does not
>>>>>> look that way
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Spark Structured Streaming,  "foreachBatch" performs custom write
>>>>>> logic on each micro-batch through a call function. Example,
>>>>>>
>>>>>> foreachBatch(sendToSink) expects 2 parameters, first: micro-batch as
>>>>>> DataFrame or Dataset and second: unique id for each batch
>>>>>>
>>>>>>
>>>>>>
>>>>>> In my case I simultaneously read two topics through two separate
>>>>>> functions
>>>>>>
>>>>>>
>>>>>>
>>>>>>    1. foreachBatch(sendToSink). \
>>>>>>    2. foreachBatch(sendToControl). \
>>>>>>
>>>>>> This is  the code
>>>>>>
>>>>>> def sendToSink(df, batchId):
>>>>>>     if(len(df.take(1))) > 0:
>>>>>>         print(f"""From sendToSink, md, batchId is {batchId}, at
>>>>>> {datetime.now()} """)
>>>>>>         #df.show(100,False)
>>>>>>         df. persist()
>>>>>>         # write to BigQuery batch table
>>>>>>         #s.writeTableToBQ(df, "append",
>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>>         df.unpersist()
>>>>>>         #print(f"""wrote to DB""")
>>>>>>        else:
>>>>>>         print("DataFrame md is empty")
>>>>>>
>>>>>> def sendToControl(dfnewtopic, batchId2):
>>>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>>>         print(f"""From sendToControl, newtopic batchId is
>>>>>> {batchId2}""")
>>>>>>         dfnewtopic.show(100,False)
>>>>>>         queue = dfnewtopic.first()[2]
>>>>>>         status = dfnewtopic.first()[3]
>>>>>>         print(f"""testing queue is {queue}, and status is {status}""")
>>>>>>         if((queue == config['MDVariables']['topic']) & (status ==
>>>>>> 'false')):
>>>>>>           spark_session = s.spark_session(config['common']['appName'])
>>>>>>           active = spark_session.streams.active
>>>>>>           for e in active:
>>>>>>              name = e.name
>>>>>>              if(name == config['MDVariables']['topic']):
>>>>>>                 print(f"""\n==> Request terminating streaming process
>>>>>> for topic {name} at {datetime.now()}\n """)
>>>>>>                 e.stop()
>>>>>>     else:
>>>>>>         print("DataFrame newtopic is empty")
>>>>>>
>>>>>>
>>>>>> The problem I have is to share batchID from the first function in the
>>>>>> second function sendToControl(dfnewtopic, batchId2) so I can print
>>>>>> it out.
>>>>>>
>>>>>>
>>>>>> Defining a global did not work.. So it sounds like I am missing
>>>>>> something rudimentary here!
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>

Reply via email to