Best to qualify your thoughts with an example
By using the foreachBatch function combined with the update output mode in
Spark Structured Streaming, you can effectively handle and integrate
late-arriving data into your aggregations. This approach will allow you to
continuously update your
Hi Team,
Hope you all are doing well. I have run into a use case in which I want to
do the aggregation in foreachbatch and use update mode for handling late
data in structured streaming. Will this approach work in effectively
capturing late arriving data in the aggregations? Please help.
ided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.or
Hi Team,
We are trying to use *spark structured streaming *for our use case.
We will be joining 2 streaming sources(from kafka topic) with watermarks.
As time progresses, the records that are prior to the watermark timestamp
are removed from the state. For our use case, we want to *store
icitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 9 Feb 2024 at 16:16, Mich Talebzadeh
wrote:
> Appreciate your thoughts on this, Personally I think Spark Structured
> Streaming can be used effectively in an
Appreciate your thoughts on this, Personally I think Spark Structured
Streaming can be used effectively in an Event Driven Architecture as well
as continuous streaming)
>From the link here
<https://www.linkedin.com/posts/activity-7161748945801617409-v29V?utm_source=share_medium=member_d
Hi Ashok,
Thanks for pointing out the databricks article Scalable Spark Structured
Streaming for REST API Destinations | Databricks Blog
<https://www.databricks.com/blog/scalable-spark-structured-streaming-rest-api-destinations>
I browsed it and it is basically similar to many of us in
Hey Mich,
Thanks for this introduction on your forthcoming proposal "Spark Structured
Streaming and Flask REST API for Real-Time Data Ingestion and Analytics". I
recently came across an article by Databricks with title Scalable Spark
Structured Streaming for REST API Destinations.
uction.
On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh
wrote:
> Thought it might be useful to share my idea with fellow forum members. During
> the breaks, I worked on the *seamless integration of Spark Structured
> Streaming with Flask REST API for real-time data ingestion and analytics*.
Thought it might be useful to share my idea with fellow forum members. During
the breaks, I worked on the *seamless integration of Spark Structured
Streaming with Flask REST API for real-time data ingestion and analytics*.
The use case revolves around a scenario where data is generated through
The feature was added in Spark 3.0. Btw, you may want to check out the EOL
date for Apache Spark releases - https://endoflife.date/apache-spark 2.x is
already EOLed.
On Fri, Nov 24, 2023 at 11:13 PM mallesh j
wrote:
> Hi Team,
>
> I am trying to test the performance of a spark streaming
Hi Team,
I am working on a basic streaming aggregation where I have one file stream
source and two write sinks (Hudi table). The only difference is the
aggregation performed is different, hence I am using the same spark
session to perform both operations.
(File Source)
--> Agg1 -> DF1
Unsubscribe
> Em 9 de out. de 2023, à(s) 07:03, Mich Talebzadeh
> escreveu:
>
> Hi,
>
> Please see my responses below:
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> No. a co
2023 at 11:04:41 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> Please see my responses below:
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> No. a commit d
responses below:
1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?
No. a commit does not refer to data being delivered to a sink like Snowflake or
bigQuery. The term commit refers to Spark Structured Streaming (SS) internals.
Specifically
Hi,
Please see my responses below:
1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?
No. a commit does not refer to data being delivered to a sink like
Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
(SS
Hello team
1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?
2) if sinks like Snowflake cannot absorb or digest streaming data in a timely
manner, will there be an impact on spark streaming itself?
Thanks
AK
Hi,
Thanks for your response.
I understand there is no explicit way to configure dynamic scaling for
Spark Structured Streaming as the ticket is still open for that. But is
there a way to manage dynamic scaling with the existing Batch Dynamic
scaling algorithm as this kicks in when Dynamic
Hi,
Autoscaling is not compatible with Spark Structured Streaming
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
since
Spark Structured Streaming currently does not support dynamic allocation
(see SPARK-24815: Structured Streaming should support d
Hi Team,
I have been working on Spark Structured Streaming and trying to autoscale
our application through dynamic allocation. But I couldn't find any
documentation or configurations that supports dynamic scaling in Spark
Structured Streaming, due to which I had been using Spark Batch mode
Agreed. How does asynchronous communication relate to Spark Structured
streaming?
In the previous post of yours, you made your Spark to run on the driver in
a single JVM. You attempted to increase the number of executors to 3 after
submission of the job that (as Sean alluded to) would not work
What do you mean by asynchronously here?
On Sun, Mar 26, 2023, 10:22 AM Emmanouil Kritharakis <
kritharakismano...@gmail.com> wrote:
> Hello again,
>
> Do we have any news for the above question?
> I would really appreciate it.
>
> Thank you,
>
>
Hello again,
Do we have any news for the above question?
I would really appreciate it.
Thank you,
--
Emmanouil (Manos) Kritharakis
Ph.D. candidate in the Department of Computer Science
Hello,
I hope this email finds you well!
I have a simple dataflow in which I read from a kafka topic, perform a map
transformation and then I write the result to another topic. Based on your
documentation here
ias("parsed_value"))
>>
>> Ok, one secure way of doing it though shutting down the streaming process
>> gracefully without loss of data that impacts consumers. The other method
>> implies inflight changes as suggested by the topic with zeio interruptions.
>> Interestingly one of o
e"))
>
> Ok, one secure way of doing it though shutting down the streaming process
> gracefully without loss of data that impacts consumers. The other method
> implies inflight changes as suggested by the topic with zeio interruptions.
> Interestingly one of our clients requested
without loss of data that impacts consumers. The other method
implies inflight changes as suggested by the topic with zeio interruptions.
Interestingly one of our clients requested a similar solution. As solutions
architect /engineering manager I should come back with few options. I a
Spark Structured Streaming can write to anything as long as an appropriate
API or JDBC connection exists.
I have not tried Kinesis but have you thought about how you want to write
it as a Sync?
Those quota limitations, much like quotas set by the vendors (say Google on
BigQuery writes etc
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to
*Problems Description*
1. I currently would like to use pyspark structured streaming to write data
to kinesis. But it seems like does not have corresponding connector can
use. I would confirm
OK I found a workaround.
Basically each stream state is not kept and I have two streams. One is a
business topic and the other one created to shut down spark structured
streaming gracefully.
I was interested to print the value for the most recent batch Id for the
business topic called "md&
iew my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 4 Mar 2023 a
e batch ID already, no?
>> Or why not simply put the logic of both in one function? or write one
>> function that calls both?
>>
>> On Sat, Mar 4, 2023 at 2:07 PM Mich Talebzadeh
>> wrote:
>>
>>>
>>> This is probably pretty straight forward
at 2:07 PM Mich Talebzadeh
> wrote:
>
>>
>> This is probably pretty straight forward but somehow is does not look
>> that way
>>
>>
>>
>> On Spark Structured Streaming, "foreachBatch" performs custom write
>> logic on each micro-batch t
t;
>
> On Spark Structured Streaming, "foreachBatch" performs custom write logic
> on each micro-batch through a call function. Example,
>
> foreachBatch(sendToSink) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch
>
>
This is probably pretty straight forward but somehow is does not look that
way
On Spark Structured Streaming, "foreachBatch" performs custom write logic
on each micro-batch through a call function. Example,
foreachBatch(sendToSink) expects 2 parameters, first: micro-batch as
n
>> print('Awaiting termination...')
>> query.awaitTermination(wait_time)
>> ```
>>
>>
>> I'd also be interested is there is a newer/better way to do this.. so please
>> cc me on updates :)
>>
>>
>> On Thu, May 6, 2021 at 1:08 PM Mich T
; I'd also be interested is there is a newer/better way to do this.. so please
> cc me on updates :)
>
>
> On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh
> wrote:
>
>> That is a valid question and I am not aware of any new addition to Spark
>> Structured Streaming (SS
y on-going
> personal stuff. I'll adjust the JIRA first.
>
> Thanks,
> Dongjoon.
>
>
> On Sat, Feb 18, 2023 at 10:51 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> https://issues.apache.org/jira/browse/SPARK-42485
>>
>>
>> Spark Str
e:
>
>> https://issues.apache.org/jira/browse/SPARK-42485
>>
>>
>> Spark Structured Streaming is a very useful tool in dealing with Event
>> Driven Architecture. In an Event Driven Architecture, there is generally a
>> main loop that listens for events and
oon.
On Sat, Feb 18, 2023 at 10:51 AM Mich Talebzadeh
wrote:
> https://issues.apache.org/jira/browse/SPARK-42485
>
>
> Spark Structured Streaming is a very useful tool in dealing with Event
> Driven Architecture. In an Event Driven Architecture, there is generally a
> main loop th
https://issues.apache.org/jira/browse/SPARK-42485
Spark Structured Streaming is a very useful tool in dealing with Event
Driven Architecture. In an Event Driven Architecture, there is generally a
main loop that listens for events and then triggers a call-back function
when one of those events
Doesn't directly answer your question but there are ways in scala and
pyspark - See if this helps:
https://repost.aws/questions/QUP_OJomilTO6oIgvK00VHEA/writing-data-to-kinesis-stream-from-py-spark
On Thu, Feb 16, 2023, 8:27 PM hueiyuan su wrote:
> *Component*: Spark Structured Stream
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to
*Problems Description*
I would like to implement witeStream data to AWS Kinesis with Spark
structured Streaming, but I do not find related connector jar can be used.
I want to check whether fully
at 5:12 PM, hueiyuan su wrote:
> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> -
> *Problems Description*
> I would like to confirm could we directly apply new options of
> readStream/writeStream without stoppi
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to
-
*Problems Description*
I would like to confirm could we directly apply new options of
readStream/writeStream without stopping current running spark structured
streaming applications? For example
/better way to do this.. so
please cc me on updates :)
On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh
wrote:
> That is a valid question and I am not aware of any new addition to Spark
> Structured Streaming (SSS) in newer releases for this graceful shutdown.
>
> Going back to my e
-- Forwarded message -
From: Mich Talebzadeh
Date: Thu, 6 May 2021 at 20:07
Subject: Re: Graceful shutdown SPARK Structured Streaming
To: ayan guha
Cc: Gourav Sengupta , user @spark <
user@spark.apache.org>
That is a valid question and I am not aware of any new ad
We started hitting this as well, seeing 90+ GB resident memory on a 25 GB
heap executor. After a lot of manually testing fixes, I finally figured out
the root problem: https://issues.apache.org/jira/browse/SPARK-41339
Starting to work on a PR now to fix.
On Mon, Sep 12, 2022 at 10:46 AM Artemis
Hello Spark Team,
Greetings!
I am writing this mail to get suggestions on the observation below.
*Use Case:* Spark Structured Streaming to extract data from Azure Event
Hub, process it, and write it to Snowflake Database table using
ForEachBatch with Epoch_Id/ Batch_Id passed to the foreach
here is the stackoverflow link
https://stackoverflow.com/questions/73780259/spark-structured-streaming-stderr-getting-filled-up
On Mon, Sep 19, 2022 at 4:41 PM karan alang wrote:
> I've created a stackoverflow ticket for this as well
>
> On Mon, Sep 19, 2022 at 4:37 PM karan ala
I've created a stackoverflow ticket for this as well
On Mon, Sep 19, 2022 at 4:37 PM karan alang wrote:
> Hello All,
> I've a Spark Structured Streaming job on GCP Dataproc - which picks up
> data from Kafka, does processing and pushes data back into kafka topics.
>
> Couple of
Hello All,
I've a Spark Structured Streaming job on GCP Dataproc - which picks up data
from Kafka, does processing and pushes data back into kafka topics.
Couple of questions :
1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
What I notice is that stdout is empty, while all
The off-heap memory isn't subjected to GC. So the obvious reason is
that your have too many states to maintain in your streaming app, and
the GC couldn't keep up, and end up with resources but to die. Are you
using continues processing or microbatch in structured streaming? You
may want to
Hi Team,
We are trying to shift from HDFS State Manager to Rocks DB State Manager,
but while doing POC we realised it is using much more off-heap space than
expected. Because of this, the executors get killed with : *out of**
physical memory exception.*
Could you please help in understanding,
Hello All,
i've a Spark structured streaming job which reads from Kafka, does
processing and puts data into Mongo/Kafka/GCP Buckets (i.e. it is
processing heavy)
I'm consistently seeing the following warnings:
```
22/09/06 16:55:03 INFO
>>
>> I am seeing weird behavior in our spark structured streaming application
>> where the offerts are not getting picked by the streaming job.
>>
>> If I delete the checkpoint directory and run the job again, I can see the
>> data for the first batch but it i
,
I am seeing weird behavior in our spark structured streaming
application where the offerts are not getting picked by the streaming
job.
If I delete the checkpoint directory and run the job again, I can see
the data for the first batch but it is not picking up new offsets
again from
Hi,
I am seeing weird behavior in our spark structured streaming application
where the offerts are not getting picked by the streaming job.
If I delete the checkpoint directory and run the job again, I can see the
data for the first batch but it is not picking up new offsets again from
the next
Hello All,
I've a Structured Streaming program running on GCP dataproc which reads
data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple
customers.
In my current code, i'm looping over the customers passing it to the
Dear Mich,
a super duper note of thanks, I had to spend around two weeks to figure
this out :)
Regards,
Gourav Sengupta
On Sat, Feb 26, 2022 at 10:43 AM Mich Talebzadeh
wrote:
>
>
> On Mon, 26 Apr 2021 at 10:21, Mich Talebzadeh
> wrote:
>
>>
>> Spark Structured
On Mon, 26 Apr 2021 at 10:21, Mich Talebzadeh
wrote:
>
> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
> Event Driven Architecture. In an Event Driven Architecture, there is
> generally a main loop that listens for events and then triggers a call-back
>
Looks like somehow related to API unable to send data from executor to driver
If I set spark master to local I get these 6 files
When spark.master is local& InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/d& InputReportAndFileName
fileName
Here is Spark's API definition, unable to understand what does it mean to have
"unknown" file. We are processing file we will have fileName I have 7
files it can print 3 and miss other 4
/** * Returns the holding file name or empty string if it is unknown.
*/ def
Hello all,
I am trying to extract file name like following but intermittanly we are
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read()
.option("multiLine", true) .json("src/main/resources/sample.json")
.schema();Step2: Get Input DataSetDataset inputDS =
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.
And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
Hello,
I have a structured streaming job that needs to be able to write to
multiple sinks. We are using *Continuous* Trigger *and not* *Microbatch*
Trigger.
1. When we use the foreach method using:
*dataset1.writeStream.foreach(kafka ForEachWriter
Hey folks:
does Spark Structured Streaming have any plans for dynamic scaling?
Currently Spark only has a dynamic scaling mechanism for batch jobs
t;refunded" means I must give the transaction amount back to
>>>> the customer balance
>>>>
>>>> So, technically, we cannot process something that is not "AUTHORIZED"
>>>> (created) yet, nor can we process a refund for a transacti
OCESSED yet.
>>>
>>>
>>> *You have an authorisation, then the actual transaction and maybe a
>>>> refund some time in the future. You want to proceed with a transaction only
>>>> if you've seen the auth but in an eventually consistent system this mi
ROCESSED yet.
>>
>>
>> *You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>&
> always happen.*
>
>
> That's absolutely the case! So, yes, That's correct.
>
> *You are asking in the case of receiving the transaction before the auth
>> how to retry later? *
>
>
> Yeah! I'm struggling for days on how to solve with Spark Structured
> Stream
g the transaction before the auth
> how to retry later? *
Yeah! I'm struggling for days on how to solve with Spark Structured
Streaming...
*Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
; if(len(dfnewtopic.take(1))) > 0:
>>>> ......
>>>>
>>>> Now you have two dataframe* df* and *dfnewtopic* in the same session.
>>>> Will you be able to join these two dataframes through common key value?
>>>>
>>>> HTH
&
;timeissued") \
>>>, col("parsed_value.price").alias("price")). \
>>> writeStream. \
>>> outputMode('append'). \
>>> option("truncate", &quo
start()
>> print(result)
>>
>> except Exception as e:
>> print(f"""{e}, quitting""")
>> sys.exit(1)
>>
>> Inside that function say *sendToSink *you can get the df and batchId
&
ion say *sendToSink *you can get the df and batchId
>
> def sendToSink(df, batchId):
> if(len(df.take(1))) > 0:
> print(f"""md batchId is {batchId}""")
> df.show(100,False)
> df. persist()
> # write to BigQuery
..
Now you have two dataframe* df* and *dfnewtopic* in the same session. Will
you be able to join these two dataframes through common key value?
HTH
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
Hello! Sure thing!
I'm reading them *separately*, both are apps written with Scala + Spark
Structured Streaming.
I feel like I missed some details on my original thread (sorry it was past
4 AM) and it was getting frustrating
Please let me try to clarify some points:
*Transactions Created
Can you please clarify if you are reading these two topics separately or
within the same scala or python script in Spark Structured Streaming?
HTH
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
*Disclaimer:* Use it at your own risk. Any a
,
- transaction-processed
- Even though the schema is not exactly the same, they all share a
correlation_id, which is their "transaction_id"
So, long story short, I've got 2 consumers, one for each topic, and all I
wanna do is sink them in a chain order. I'm writing them w/ Spark
| foo|null|2| Count when word is foo
>> | foo| 1|1| When word is foo and num is 1
>> | foo| 2|1| When word is foo and num is 2
>> +++-+
>>
>>
>> So rollup() returns a subset of the rows returned by cube(). From the
>>
+
>
>
> So rollup() returns a subset of the rows returned by cube(). From the
> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
> missing rows.
>
> +++-+
> |word| num|count|
> +++-+
> |null| 1|1| Word is null an
Here are the
missing rows.
+++-+
|word| num|count|
+----++-----+
|null| 1|1| Word is null and num is 1
|null| 2|3| Word is null and num is 2
+++-+
Now back to Spark Structured Streaming (SSS), we have basic aggregations
""&quo
may need to store the state to update the count. So spark structured
streaming states will come into picture.
As now with batch programming, we can do it with
> df.rollup(col1,col2).count
But if I try to use it with spark structured streaming state, will it store
the state of all the group
, Amit Joshi wrote:
> Appreciate if someone could give some pointers in the question below.
>
> -- Forwarded message -
> From: Amit Joshi
> Date: Tue, Jun 15, 2021 at 12:19 PM
> Subject: [Spark]Does Rollups work with spark structured streaming with
> state.
>
Appreciate if someone could give some pointers in the question below.
-- Forwarded message -
From: Amit Joshi
Date: Tue, Jun 15, 2021 at 12:19 PM
Subject: [Spark]Does Rollups work with spark structured streaming with
state.
To: spark-user
Hi Spark-Users,
Hope you are all
Hi Spark-Users,
Hope you are all doing well.
Recently I was looking into rollup operations in spark.
As we know state based aggregation is supported in spark structured
streaming.
I was wondering if rollup operations are also supported?
Like the state of previous aggregation on the rollups
Hi Mich,
I agree with you; spark streaming will become defunct in favor of
Structured Streaming. And I have gone over the document in detail. I am
aware of the unbounded datasets and running aggregate etc..
Nevertheless, I wouldn't say it's a moot point as it provides a good
intuition of the
Hi,
I guess whether structured streaming (SS) inherited anything from spark
streaming is a moot point now, although it is a concept built on spark
streaming which will be defunct soon.
Going forward, It all depends on what problem you are trying to address.
These are explained in the following
Hi,
I am using Structured Streaming on Azure HdInsight. The version is 2.4.6.
I am trying to understand the microbatch mode - default and fixed
intervals. Does the fixed interval microbatch follow something similar to
receiver based model where records keep getting pulled and stored into
blocks
That is a valid question and I am not aware of any new addition to Spark
Structured Streaming (SSS) in newer releases for this graceful shutdown.
Going back to my earlier explanation, there are occasions that you may want
to stop the Spark program gracefully. Gracefully meaning that Spark
;
>>
>> I believe I discussed this in this forum. I sent the following to
>> spark-dev forum as an add-on to Spark functionality. This is the gist of
>> it.
>>
>>
>> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
>> Event Driven Arch
n this forum. I sent the following to
> spark-dev forum as an add-on to Spark functionality. This is the gist of
> it.
>
>
> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
> Event Driven Architecture. In an Event Driven Architecture, there is
> generally
Hi,
I believe I discussed this in this forum. I sent the following to spark-dev
forum as an add-on to Spark functionality. This is the gist of it.
Spark Structured Streaming AKA SSS is a very useful tool in dealing with
Event Driven Architecture. In an Event Driven Architecture
,
Gourav Sengupta
-- Forwarded message -
From: Gourav Sengupta
Date: Wed, Apr 21, 2021 at 10:06 AM
Subject: Graceful shutdown SPARK Structured Streaming
To:
Dear friends,
is there any documentation available for gracefully stopping SPARK
Structured Streaming in 3.1.x?
I am
Like to hear comments on this. Basically the ability to shutdown a running
spark structured streaming process gracefully.
In a way it may be something worth integrating in Spark structured
streaming. Much like Kafka team are working to get rid of zooKeeper and
replacing it with a system type
Hi,
This is the design that I came up with.
How to shutdown the topic doing work for the message being processed, wait
for it to complete and shutdown the streaming process for a given topic.
I thought about this and looked at options. Using sensors to implement this
like airflow would be
;
> trigger(processingTime='2 seconds'). \
>
> start()
>
> spark.streams.awaitAnyTermination()
>
>
> def sendToControl(dfnewtopic, batchId):
>
> if(len(dfnewtopic.take(1))) > 0:
>
> print(f"""new
eams.awaitAnyTermination()
def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
print(f"""newtopic batchId is {batchId}""")
dfnewtopic.show(100,False)
spark.streams.active.forEach(_.stop)
else:
print("
1 - 100 of 594 matches
Mail list logo