;
> trigger(processingTime='2 seconds'). \
>
> start()
>
> spark.streams.awaitAnyTermination()
>
>
> def sendToControl(dfnewtopic, batchId):
>
> if(len(dfnewtopic.take(1))) > 0:
>
> print(f"""new
eams.awaitAnyTermination()
def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
print(f"""newtopic batchId is {batchId}""")
dfnewtopic.show(100,False)
spark.streams.active.forEach(_.stop)
else:
print("
ote:
> What are you trying to do? Can you give us a bigger picture?
>
>
>
> *From: *Mich Talebzadeh
> *Date: *Thursday, April 22, 2021 at 11:43 AM
> *To: *"user @spark"
> *Subject: *RE: [EXTERNAL] Dealing with two topics in Spark Structured
> Streaming
>
>
&g
What are you trying to do? Can you give us a bigger picture?
From: Mich Talebzadeh
Date: Thursday, April 22, 2021 at 11:43 AM
To: "user @spark"
Subject: RE: [EXTERNAL] Dealing with two topics in Spark Structured Streaming
CAUTION: This email originated from outside of the organi
Hi,
I need some ideas on this actually. These two functions return respective
data frames in spark structured streaming. THey are as a result
of foreachBatch()
I just need to be able to access the value of* status* worked out
in sendToControl() in sendToSink() method
def
Hi,
I am dealing with two topics in SSS.
Topic "md" returns market data values as per
result = streamingDataFrame.select( \
col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
,