Hi,

I need some ideas on this actually. These two functions return respective
data frames in spark structured streaming. THey are as a result
of foreachBatch()

I just need to be able to access the value of* status* worked out
in sendToControl() in sendToSink() method


def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
        dfnewtopic.show(100,False)
        *status = dfnewtopic.select(col("status")).collect()[0][0]*
    else:
        print("DataFrame newtopic is empty")

# need to access that *status* in below method
def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""md batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
    else:
        print("DataFrame md is empty")

I know using global variable etc is not going to be viable. How can this be
achieved? At anytime the value of status is either true or false

Thanks



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 22 Apr 2021 at 10:29, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
>
> I am dealing with two topics in SSS.
>
>
> Topic "md" returns market data values as per
>
>
>             result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToSink*). \
>
>                      trigger(processingTime='30 seconds'). \
>
> Topic "newtopic" returns one row including status column
>
>            newtopicResult = streamingNewtopic.select( \
>                      col("newtopic_value.uuid").alias("uuid") \
>                    ,
> col("newtopic_value.timecreated").alias("timecreated") \
>                    , col("newtopic_value.status").alias("status")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(*sendToControl*). \
>                      trigger(processingTime='2 seconds'). \
>                      start()
>
> The method sendToSink writes values to a database BigQuery
>
> def sendToSink(df, batchId):
>     if(len(df.take(1))) > 0:
>         print(f"""{batchId}""")
>         df.show(100)
>         df. persist()
>         # write to BigQuery batch table
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>         df.unpersist()
>     else:
>         print("DataFrame is empty")
>
> The second method simply gets value for status column
>
> def sendToControl(dfnewtopic, batchId):
>     if(len(dfnewtopic.take(1))) > 0:
>         print(f"""newtopic batchId is {batchId}""")
>        * status = dfnewtopic.select(col("status")).collect()[0][0]*
>     else:
>         print("DataFrame newtopic is empty")
>
> I need to be able to check the status value from the sendToControl method
> in the sendToSink method. Note that status values may be changing as per
> streaming. What would be the best approach?
>
> Thanks
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to