Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Ant Kutschera
Hi

*Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.*

Not sure if this helps or works, but in a Kafka streaming Api solution
without Spark that I did a few years ago, we used artificial events
published once a second to ensure that windows were closed because by
design Kafka streaming only closes windows when events are flowing.  So you
could artificially trigger an 'empty' microbatch because it would contain
only artificial events, which you can of course filter out in the
microbatch processing.




On Thu, 11 Jan 2024, 00:26 Andrzej Zera,  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Ant Kutschera
It might be good to first split the stream up into smaller streams, one per
type.  If ordering of the Kafka records is important, then you could
partition them at the source based on the type, but be careful how you
configure Spark to read from Kafka as that could also influence ordering.

kdf = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
...
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")  # see below where we
trigger with availableNow and checkpoint
.option("subscribe", "topicName")  # the name of the topic
.load())

kdf = kdf.selectExpr("CAST(key AS STRING)",
 "CAST(value AS STRING)",
 "headers",
 "CAST(topic AS STRING)",
 "CAST(partition AS STRING)",
 "CAST(offset AS STRING)")

kdf_one = kdf.filter(kdf.type == 'one')
kdf_two = kdf.filter(kdf.type == 'two')
kdf_three = kdf.filter(kdf.type == 'three')

then transform each one as you need to:

kdf_one = kdf.transform(prepare_for_database_one)

and start each DataFrame and use foreachBatch to store the data in the DB:

(kdf_one
   .writeStream
   .queryName("one")
   .foreachBatch(saveastable_one)
   .trigger(availableNow=True)
   .option("checkpointLocation", "s3a://checkpointlocation/")  #
very important to be on writeStream!
   .start()
   .awaitTermination())


On Wed, 10 Jan 2024 at 21:01, Khalid Mammadov 
wrote:

> Use foreachBatch or foreach methods:
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:
>
>> Hi
>> I have a use case where I need to process json payloads coming from Kafka
>> using structured streaming , but thing is json can have different formats ,
>> schema is not fixed
>> and each json will have a @type tag so based on tag , json has to be
>> parsed and loaded to table with tag name  , and if a json has nested sub
>> tags , those tags shd go to different table
>> so I need to process each json record individually , and determine
>> destination tables what would be the best approach
>>
>>
>>> *{*
>>> *"os": "andriod",*
>>> *"type": "mobile",*
>>> *"device": {*
>>> *"warrenty": "3 years",*
>>> *"replace": "yes"*
>>> *},*
>>> *"zones": [*
>>> *{*
>>> *"city": "Bangalore",*
>>> *"state": "KA",*
>>> *"pin": "577401"*
>>> *},*
>>> *{*
>>> *"city": "Mumbai",*
>>> *"state": "MH",*
>>> *"pin": "576003"*
>>> *}*
>>> *],*
>>> *"@table": "product"**}*
>>
>>
>> so for the above json , there are 3 tables created
>> 1. Product (@type) THis is a parent table
>> 2.  poduct_zones and product_devices , child table
>>
>

-- 
___
Dr Ant Kutschera