You never want to do System.exit from inside your forEachBatch method because 
that doesn’t cleanly terminate the query. You should use call stop on each 
query to terminate it safely

You need something like this

result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreachBatch(sendToSink). \

                     trigger(processingTime='30 seconds'). \

newtopicResult = streamingNewtopic.select( \
                     col("newtopic_value.uuid").alias("uuid") \
                   , col("newtopic_value.timecreated").alias("timecreated") \
                   , col("newtopic_value.status").alias("status")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(sendToControl). \
                     trigger(processingTime='2 seconds'). \
                     start()
spark.streams.awaitAnyTermination()

def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
        dfnewtopic.show(100,False)
        spark.streams.active.forEach(_.stop)
    else:
        print("DataFrame newtopic is empty")


From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Thursday, April 22, 2021 at 12:10 PM
To: "Lalwani, Jayesh" <jlalw...@amazon.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: RE: [EXTERNAL] Dealing with two topics in Spark Structured Streaming


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

Basically reading two streaming topics in Spark structured streaming.

One topic called newtopic has this column status which could be either "true" 
or "false". It is effectively a controlling topic to make the main topic called 
md (market data) exit.

I am experimenting with this. So topic md is streaming DAG (sequence of 
transformation on the topic) and the topic newtopic is the controlling DAG. I 
trust this makes sense.

So here ideally I want to perform the following

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""md batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append", 
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
        ## pseudo code
        if status == "true"   ## I need to know this value coming from 
newtopic, and test it immediately after writing the message to BigQuery
          terminate the process and exit from the Spark program for whatever 
reason. May need to refine this exit as well
          System.exit(0)
    else:
        print("DataFrame md is empty")

As I stated, I am experimenting with this.

Thanks





 [Image removed by sender.]   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 22 Apr 2021 at 16:53, Lalwani, Jayesh 
<jlalw...@amazon.com<mailto:jlalw...@amazon.com>> wrote:
What are you trying to do? Can you give us a bigger picture?

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Thursday, April 22, 2021 at 11:43 AM
To: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: RE: [EXTERNAL] Dealing with two topics in Spark Structured Streaming


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

I need some ideas on this actually. These two functions return respective data 
frames in spark structured streaming. THey are as a result of foreachBatch()

I just need to be able to access the value of status worked out in 
sendToControl() in sendToSink() method


def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
        dfnewtopic.show(100,False)
        status = dfnewtopic.select(col("status")).collect()[0][0]
    else:
        print("DataFrame newtopic is empty")

# need to access that status in below method
def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""md batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append", 
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
    else:
        print("DataFrame md is empty")

I know using global variable etc is not going to be viable. How can this be 
achieved? At anytime the value of status is either true or false

Thanks




 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 22 Apr 2021 at 10:29, Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote:


Hi,



I am dealing with two topics in SSS.



Topic "md" returns market data values as per



            result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreachBatch(sendToSink). \

                     trigger(processingTime='30 seconds'). \

Topic "newtopic" returns one row including status column

           newtopicResult = streamingNewtopic.select( \
                     col("newtopic_value.uuid").alias("uuid") \
                   , col("newtopic_value.timecreated").alias("timecreated") \
                   , col("newtopic_value.status").alias("status")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(sendToControl). \
                     trigger(processingTime='2 seconds'). \
                     start()

The method sendToSink writes values to a database BigQuery

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""{batchId}""")
        df.show(100)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append", 
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
    else:
        print("DataFrame is empty")

The second method simply gets value for status column

def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
        status = dfnewtopic.select(col("status")).collect()[0][0]
    else:
        print("DataFrame newtopic is empty")

I need to be able to check the status value from the sendToControl method in 
the sendToSink method. Note that status values may be changing as per 
streaming. What would be the best approach?

Thanks



 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.


Reply via email to