This is probably pretty straight forward but somehow is does not look that
way
On Spark Structured Streaming, "foreachBatch" performs custom write logic
on each micro-batch through a call function. Example,
foreachBatch(sendToSink) expects 2 parameters, first: micro-batch as
DataFrame or Dataset and second: unique id for each batch
In my case I simultaneously read two topics through two separate functions
1. foreachBatch(sendToSink). \
2. foreachBatch(sendToControl). \
This is the code
def sendToSink(df, batchId):
if(len(df.take(1))) > 0:
print(f"""From sendToSink, md, batchId is {batchId}, at
{datetime.now()} """)
#df.show(100,False)
df. persist()
# write to BigQuery batch table
#s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
df.unpersist()
#print(f"""wrote to DB""")
else:
print("DataFrame md is empty")
def sendToControl(dfnewtopic, batchId2):
if(len(dfnewtopic.take(1))) > 0:
print(f"""From sendToControl, newtopic batchId is {batchId2}""")
dfnewtopic.show(100,False)
queue = dfnewtopic.first()[2]
status = dfnewtopic.first()[3]
print(f"""testing queue is {queue}, and status is {status}""")
if((queue == config['MDVariables']['topic']) & (status == 'false')):
spark_session = s.spark_session(config['common']['appName'])
active = spark_session.streams.active
for e in active:
name = e.name
if(name == config['MDVariables']['topic']):
print(f"""\n==> Request terminating streaming process for
topic {name} at {datetime.now()}\n """)
e.stop()
else:
print("DataFrame newtopic is empty")
The problem I have is to share batchID from the first function in the
second function sendToControl(dfnewtopic, batchId2) so I can print it out.
Defining a global did not work.. So it sounds like I am missing something
rudimentary here!
Thanks
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.