Hi,

Basically reading two streaming topics in Spark structured streaming.

One topic called newtopic has this column status which could be
either "true" or "false". It is effectively a controlling topic to make the
main topic called md (market data) exit.

I am experimenting with this. So topic md is streaming DAG (sequence of
transformation on the topic) and the topic newtopic is the controlling DAG.
I trust this makes sense.

So here ideally I want to perform the following

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""md batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
        ## pseudo code
        if status == "true"   ## I need to know this value coming from
newtopic, and test it immediately after writing the message to BigQuery
          terminate the process and exit from the Spark program for
whatever reason. May need to refine this exit as well
          System.exit(0)
    else:
        print("DataFrame md is empty")

As I stated, I am experimenting with this.

Thanks



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 22 Apr 2021 at 16:53, Lalwani, Jayesh <jlalw...@amazon.com> wrote:

> What are you trying to do? Can you give us a bigger picture?
>
>
>
> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
> *Date: *Thursday, April 22, 2021 at 11:43 AM
> *To: *"user @spark" <user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Dealing with two topics in Spark Structured
> Streaming
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> I need some ideas on this actually. These two functions return respective
> data frames in spark structured streaming. THey are as a result
> of foreachBatch()
>
>
>
> I just need to be able to access the value of* status* worked out
> in sendToControl() in sendToSink() method
>
>
>
>
>
> def sendToControl(dfnewtopic, batchId):
>
>     if(len(dfnewtopic.take(1))) > 0:
>
>         print(f"""newtopic batchId is {batchId}""")
>
>         dfnewtopic.show(100,False)
>
>         *status = dfnewtopic.select(col("status")).collect()[0][0]*
>
>     else:
>
>         print("DataFrame newtopic is empty")
>
>
>
> # need to access that *status* in below method
>
> def sendToSink(df, batchId):
>
>     if(len(df.take(1))) > 0:
>
>         print(f"""md batchId is {batchId}""")
>
>         df.show(100,False)
>
>         df. persist()
>
>         # write to BigQuery batch table
>
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>         df.unpersist()
>
>     else:
>
>         print("DataFrame md is empty")
>
>
>
> I know using global variable etc is not going to be viable. How can this
> be achieved? At anytime the value of status is either true or false
>
>
>
> Thanks
>
>
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 22 Apr 2021 at 10:29, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>
> Hi,
>
>
>
> I am dealing with two topics in SSS.
>
>
>
> Topic "md" returns market data values as per
>
>
>
>             result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToSink*). \
>
>                      trigger(processingTime='30 seconds'). \
>
>
>
> Topic "newtopic" returns one row including status column
>
>
>
>            newtopicResult = streamingNewtopic.select( \
>
>                      col("newtopic_value.uuid").alias("uuid") \
>
>                    ,
> col("newtopic_value.timecreated").alias("timecreated") \
>
>                    , col("newtopic_value.status").alias("status")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToControl*). \
>
>                      trigger(processingTime='2 seconds'). \
>
>                      start()
>
>
>
> The method sendToSink writes values to a database BigQuery
>
>
>
> def sendToSink(df, batchId):
>
>     if(len(df.take(1))) > 0:
>
>         print(f"""{batchId}""")
>
>         df.show(100)
>
>         df. persist()
>
>         # write to BigQuery batch table
>
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>         df.unpersist()
>
>     else:
>
>         print("DataFrame is empty")
>
>
>
> The second method simply gets value for status column
>
>
>
> def sendToControl(dfnewtopic, batchId):
>
>     if(len(dfnewtopic.take(1))) > 0:
>
>         print(f"""newtopic batchId is {batchId}""")
>
>        * status = dfnewtopic.select(col("status")).collect()[0][0]*
>
>     else:
>
>         print("DataFrame newtopic is empty")
>
>
>
> I need to be able to check the status value from the sendToControl method
> in the sendToSink method. Note that status values may be changing as per
> streaming. What would be the best approach?
>
>
>
> Thanks
>
>
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>

Reply via email to