Hi, I need some ideas on this actually. These two functions return respective data frames in spark structured streaming. THey are as a result of foreachBatch()
I just need to be able to access the value of* status* worked out in sendToControl() in sendToSink() method def sendToControl(dfnewtopic, batchId): if(len(dfnewtopic.take(1))) > 0: print(f"""newtopic batchId is {batchId}""") dfnewtopic.show(100,False) *status = dfnewtopic.select(col("status")).collect()[0][0]* else: print("DataFrame newtopic is empty") # need to access that *status* in below method def sendToSink(df, batchId): if(len(df.take(1))) > 0: print(f"""md batchId is {batchId}""") df.show(100,False) df. persist() # write to BigQuery batch table s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) df.unpersist() else: print("DataFrame md is empty") I know using global variable etc is not going to be viable. How can this be achieved? At anytime the value of status is either true or false Thanks view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 22 Apr 2021 at 10:29, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi, > > > I am dealing with two topics in SSS. > > > Topic "md" returns market data values as per > > > result = streamingDataFrame.select( \ > > col("parsed_value.rowkey").alias("rowkey") \ > > , col("parsed_value.ticker").alias("ticker") \ > > , col("parsed_value.timeissued").alias("timeissued") \ > > , col("parsed_value.price").alias("price")). \ > > writeStream. \ > > outputMode('append'). \ > > option("truncate", "false"). \ > > foreachBatch(*sendToSink*). \ > > trigger(processingTime='30 seconds'). \ > > Topic "newtopic" returns one row including status column > > newtopicResult = streamingNewtopic.select( \ > col("newtopic_value.uuid").alias("uuid") \ > , > col("newtopic_value.timecreated").alias("timecreated") \ > , col("newtopic_value.status").alias("status")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > foreachBatch(*sendToControl*). \ > trigger(processingTime='2 seconds'). \ > start() > > The method sendToSink writes values to a database BigQuery > > def sendToSink(df, batchId): > if(len(df.take(1))) > 0: > print(f"""{batchId}""") > df.show(100) > df. persist() > # write to BigQuery batch table > s.writeTableToBQ(df, "append", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > df.unpersist() > else: > print("DataFrame is empty") > > The second method simply gets value for status column > > def sendToControl(dfnewtopic, batchId): > if(len(dfnewtopic.take(1))) > 0: > print(f"""newtopic batchId is {batchId}""") > * status = dfnewtopic.select(col("status")).collect()[0][0]* > else: > print("DataFrame newtopic is empty") > > I need to be able to check the status value from the sendToControl method > in the sendToSink method. Note that status values may be changing as per > streaming. What would be the best approach? > > Thanks > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >