Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Ant Kutschera
Hi

*Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.*

Not sure if this helps or works, but in a Kafka streaming Api solution
without Spark that I did a few years ago, we used artificial events
published once a second to ensure that windows were closed because by
design Kafka streaming only closes windows when events are flowing.  So you
could artificially trigger an 'empty' microbatch because it would contain
only artificial events, which you can of course filter out in the
microbatch processing.




On Thu, 11 Jan 2024, 00:26 Andrzej Zera,  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Ant Kutschera
It might be good to first split the stream up into smaller streams, one per
type.  If ordering of the Kafka records is important, then you could
partition them at the source based on the type, but be careful how you
configure Spark to read from Kafka as that could also influence ordering.

kdf = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
...
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")  # see below where we
trigger with availableNow and checkpoint
.option("subscribe", "topicName")  # the name of the topic
.load())

kdf = kdf.selectExpr("CAST(key AS STRING)",
 "CAST(value AS STRING)",
 "headers",
 "CAST(topic AS STRING)",
 "CAST(partition AS STRING)",
 "CAST(offset AS STRING)")

kdf_one = kdf.filter(kdf.type == 'one')
kdf_two = kdf.filter(kdf.type == 'two')
kdf_three = kdf.filter(kdf.type == 'three')

then transform each one as you need to:

kdf_one = kdf.transform(prepare_for_database_one)

and start each DataFrame and use foreachBatch to store the data in the DB:

(kdf_one
   .writeStream
   .queryName("one")
   .foreachBatch(saveastable_one)
   .trigger(availableNow=True)
   .option("checkpointLocation", "s3a://checkpointlocation/")  #
very important to be on writeStream!
   .start()
   .awaitTermination())


On Wed, 10 Jan 2024 at 21:01, Khalid Mammadov 
wrote:

> Use foreachBatch or foreach methods:
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:
>
>> Hi
>> I have a use case where I need to process json payloads coming from Kafka
>> using structured streaming , but thing is json can have different formats ,
>> schema is not fixed
>> and each json will have a @type tag so based on tag , json has to be
>> parsed and loaded to table with tag name  , and if a json has nested sub
>> tags , those tags shd go to different table
>> so I need to process each json record individually , and determine
>> destination tables what would be the best approach
>>
>>
>>> *{*
>>> *"os": "andriod",*
>>> *"type": "mobile",*
>>> *"device": {*
>>> *"warrenty": "3 years",*
>>> *"replace": "yes"*
>>> *},*
>>> *"zones": [*
>>> *{*
>>> *"city": "Bangalore",*
>>> *"state": "KA",*
>>> *"pin": "577401"*
>>> *},*
>>> *{*
>>> *"city": "Mumbai",*
>>> *"state": "MH",*
>>> *"pin": "576003"*
>>> *}*
>>> *],*
>>> *"@table": "product"**}*
>>
>>
>> so for the above json , there are 3 tables created
>> 1. Product (@type) THis is a parent table
>> 2.  poduct_zones and product_devices , child table
>>
>

-- 
___
Dr Ant Kutschera


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Mich Talebzadeh
Use an intermediate work table to put json data streaming in there  in the
first place and then according to the tag store the data in the correct
table

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 18:51, PRASHANT L  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Khalid Mammadov
Use foreachBatch or foreach methods:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Structured Streaming Process Each Records Individually

2024-01-10 Thread PRASHANT L
Hi
I have a use case where I need to process json payloads coming from Kafka
using structured streaming , but thing is json can have different formats ,
schema is not fixed
and each json will have a @type tag so based on tag , json has to be parsed
and loaded to table with tag name  , and if a json has nested sub tags ,
those tags shd go to different table
so I need to process each json record individually , and determine
destination tables what would be the best approach


> *{*
> *"os": "andriod",*
> *"type": "mobile",*
> *"device": {*
> *"warrenty": "3 years",*
> *"replace": "yes"*
> *},*
> *"zones": [*
> *{*
> *"city": "Bangalore",*
> *"state": "KA",*
> *"pin": "577401"*
> *},*
> *{*
> *"city": "Mumbai",*
> *"state": "MH",*
> *"pin": "576003"*
> *}*
> *],*
> *"@table": "product"**}*


so for the above json , there are 3 tables created
1. Product (@type) THis is a parent table
2.  poduct_zones and product_devices , child table


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Yes, I agree. But apart from maintaining this state internally (in memory
or in memory+disk as in case of RocksDB), every trigger it saves some
information about this state in a checkpoint location. I'm afraid we can't
do much about this checkpointing operation. I'll continue looking for
information on how I can decrease the number of LIST requests (ListBucket
operations) made in this process.

Thank you for your input so far!
Andrzej

śr., 10 sty 2024 o 16:33 Mich Talebzadeh 
napisał(a):

> Hi,
>
> You may have a point on scenario 2.
>
> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
> processed incrementally, and it may not fit the typical caching we
> discussed. Instead, Spark Streaming has its mechanisms to manage and
> optimize the processing of streaming data. Case in point for caching
> partial results, one often relies on maintaining state by using stateful
> operations (see below) on Structured Streaming DataFrames. In such
> scenarios, Spark maintains state internally based on the operations
> performed. For example, if you are doing a groupBy followed by an
> aggregation, Spark Streaming will manage the state of the keys and update
> them incrementally.
>
> Just to clarify, in the context of Spark Structured Streaming stateful
> operation refers to an operation that maintains and updates some form of
> state across batches of streaming data. Unlike stateless operations, which
> process each batch independently, stateful operations retain information
> from previous batches and use it to produce results for the current batch.
>
> So, bottom line, while one may not explicitly cache a streaming data
> frame, Spark internally optimizes the processing by maintaining the
> necessary state.
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 14:20, Andrzej Zera  wrote:
>
>> Hey,
>>
>> Yes, that's how I understood it (scenario 1). However, I'm not sure if
>> scenario 2 is possible. I think cache on streaming DataFrame is supported
>> only in forEachBatch (in which it's actually no longer a streaming DF).
>>
>> śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
>> napisał(a):
>>
>>> Hi,
>>>
>>>  With regard to your point
>>>
>>> - Caching: Can you please explain what you mean by caching? I know that
>>> when you have batch and streaming sources in a streaming query, then you
>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>
>>> Let us visit this
>>>
>>> Caching purpose in Structured Streaming is to store frequently accessed
>>> data in memory or disk for faster retrieval, reducing repeated reads from
>>> sources.
>>>
>>> - Types:
>>>
>>>- Memory Caching: Stores data in memory for extremely fast access.
>>>- Disk Caching: Stores data on disk for larger datasets or
>>>persistence across triggers
>>>
>>>
>>> - Scenarios:
>>>
>>> Joining Streaming Data with Static Data: Cache static datasets
>>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>>
>>>-
>>>- Reusing Intermediate Results: Cache intermediate dataframes that
>>>are expensive to compute and used multiple times within the query.
>>>- Window Operations: Cache data within a window to avoid re-reading
>>>for subsequent aggregations or calculations within that window.
>>>
>>> - Benefits:
>>>
>>>- Performance: Faster query execution by reducing I/O operations and
>>>computation overhead.
>>>- Cost Optimization: Reduced reads from external sources can lower
>>>costs, especially for cloud-based sources.
>>>- Scalability: Helps handle higher data volumes and throughput by
>>>minimizing expensive re-computations.
>>>
>>>
>>> Example codec
>>>
>>> scenario 1
>>>
>>> static_data = spark.read.load("path/to/static/data")
>>> static_data.cache() streaming_data = spark.readStream.format("...").load()
>>> joined_data = streaming_data.join(static_data, ...) # Static data is
>>> cached for efficient joins
>>>
>>> scenario 2
>>>
>>> intermediate_df = streaming_data.groupBy(...).count()
>>> intermediate_df.cache()
>>> # Use cached intermediate_df for further transformations or actions
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Mich Talebzadeh
Hi,

You may have a point on scenario 2.

Caching Streaming DataFrames: In Spark Streaming, each batch of data is
processed incrementally, and it may not fit the typical caching we
discussed. Instead, Spark Streaming has its mechanisms to manage and
optimize the processing of streaming data. Case in point for caching
partial results, one often relies on maintaining state by using stateful
operations (see below) on Structured Streaming DataFrames. In such
scenarios, Spark maintains state internally based on the operations
performed. For example, if you are doing a groupBy followed by an
aggregation, Spark Streaming will manage the state of the keys and update
them incrementally.

Just to clarify, in the context of Spark Structured Streaming stateful
operation refers to an operation that maintains and updates some form of
state across batches of streaming data. Unlike stateless operations, which
process each batch independently, stateful operations retain information
from previous batches and use it to produce results for the current batch.

So, bottom line, while one may not explicitly cache a streaming data frame,
Spark internally optimizes the processing by maintaining the necessary
state.
HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 14:20, Andrzej Zera  wrote:

> Hey,
>
> Yes, that's how I understood it (scenario 1). However, I'm not sure if
> scenario 2 is possible. I think cache on streaming DataFrame is supported
> only in forEachBatch (in which it's actually no longer a streaming DF).
>
> śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
> napisał(a):
>
>> Hi,
>>
>>  With regard to your point
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> Let us visit this
>>
>> Caching purpose in Structured Streaming is to store frequently accessed
>> data in memory or disk for faster retrieval, reducing repeated reads from
>> sources.
>>
>> - Types:
>>
>>- Memory Caching: Stores data in memory for extremely fast access.
>>- Disk Caching: Stores data on disk for larger datasets or
>>persistence across triggers
>>
>>
>> - Scenarios:
>>
>> Joining Streaming Data with Static Data: Cache static datasets
>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>
>>-
>>- Reusing Intermediate Results: Cache intermediate dataframes that
>>are expensive to compute and used multiple times within the query.
>>- Window Operations: Cache data within a window to avoid re-reading
>>for subsequent aggregations or calculations within that window.
>>
>> - Benefits:
>>
>>- Performance: Faster query execution by reducing I/O operations and
>>computation overhead.
>>- Cost Optimization: Reduced reads from external sources can lower
>>costs, especially for cloud-based sources.
>>- Scalability: Helps handle higher data volumes and throughput by
>>minimizing expensive re-computations.
>>
>>
>> Example codec
>>
>> scenario 1
>>
>> static_data = spark.read.load("path/to/static/data") static_data.cache()
>> streaming_data = spark.readStream.format("...").load() joined_data =
>> streaming_data.join(static_data, ...) # Static data is cached for
>> efficient joins
>>
>> scenario 2
>>
>> intermediate_df = streaming_data.groupBy(...).count()
>> intermediate_df.cache()
>> # Use cached intermediate_df for further transformations or actions
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:
>>
>>> Thank you very much for your suggestions. Yes, my main concern is
>>> checkpointing costs.
>>>
>>> I went through your suggestions and here're my comments:
>>>

[Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Andrzej Zera
I'm struggling with the following issue in Spark >=3.4, related to multiple
stateful operations.

When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
keeps track of two types of watermarks: eventTimeWatermarkForEviction and
eventTimeWatermarkForLateEvents. Introducing them allowed chaining multiple
stateful operations but also introduced an additional delay for getting the
output out of the streaming query.

I'll show this on the example. Assume we have a stream of click events and
we aggregate it first by 1-min window and then by 5-min window. If we have
a trigger interval of 30s, then in most cases we'll get output 30s later
compared to single stateful operations queries. To find out how, let's look
at the following examples:

Example 1. Single stateful operation (aggregation by 5-min window, assume
watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp
at the time of getting data from Kafka
Global watermark Output
14:10:00 14:09:56 0 -
14:10:30 14:10:26 14:09:56 -
14:11:00 14:10:56 14:10:26 window <14:05, 14:10)

Example 2. Mutliple stateful operations (aggregation by 1-min window
followed by aggregation by 5-min window, assume watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp at the time of getting
data from Kafka Late events watermark Eviction watermark Output
14:10:00 14:09:56 0 0 -
14:10:30 14:10:26 0 14:09:56 -
14:11:00 14:10:56 14:09:56 14:10:26 -
14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)

In Example 2, we need to wait until both watermarks cross the end of the
window to get the output for that window, which happens one iteration later
compared to Example 1.

Now, in use cases that require near-real-time processing, this one
iteration delay can be quite a significant difference.

Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.

Thanks,
Andrzej


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Hey,

Yes, that's how I understood it (scenario 1). However, I'm not sure if
scenario 2 is possible. I think cache on streaming DataFrame is supported
only in forEachBatch (in which it's actually no longer a streaming DF).

śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
napisał(a):

> Hi,
>
>  With regard to your point
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> Let us visit this
>
> Caching purpose in Structured Streaming is to store frequently accessed
> data in memory or disk for faster retrieval, reducing repeated reads from
> sources.
>
> - Types:
>
>- Memory Caching: Stores data in memory for extremely fast access.
>- Disk Caching: Stores data on disk for larger datasets or persistence
>across triggers
>
>
> - Scenarios:
>
> Joining Streaming Data with Static Data: Cache static datasets
> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>
>-
>- Reusing Intermediate Results: Cache intermediate dataframes that are
>expensive to compute and used multiple times within the query.
>- Window Operations: Cache data within a window to avoid re-reading
>for subsequent aggregations or calculations within that window.
>
> - Benefits:
>
>- Performance: Faster query execution by reducing I/O operations and
>computation overhead.
>- Cost Optimization: Reduced reads from external sources can lower
>costs, especially for cloud-based sources.
>- Scalability: Helps handle higher data volumes and throughput by
>minimizing expensive re-computations.
>
>
> Example codec
>
> scenario 1
>
> static_data = spark.read.load("path/to/static/data") static_data.cache()
> streaming_data = spark.readStream.format("...").load() joined_data =
> streaming_data.join(static_data, ...) # Static data is cached for
> efficient joins
>
> scenario 2
>
> intermediate_df = streaming_data.groupBy(...).count()
> intermediate_df.cache()
> # Use cached intermediate_df for further transformations or actions
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:
>
>> Thank you very much for your suggestions. Yes, my main concern is
>> checkpointing costs.
>>
>> I went through your suggestions and here're my comments:
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> - Optimize Checkpointing Frequency: I'm already using changelog
>> checkpointing with RocksDB and increased trigger interval to a maximum
>> acceptable value.
>>
>> - Minimize LIST Request: That's where I can get most savings. My LIST
>> requests account for ~70% of checkpointing costs. From what I see, LIST
>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>> requests. They are roughly at the same level. From what I see, S3 Optimized
>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>> don't see a difference between S3 and DBFS checkpoint location suggests
>> that both must implement the same or similar committer.
>>
>> - Optimizing RocksDB: I still need to do this but I don't suspect it will
>> help much. From what I understand, these settings shouldn't have a
>> significant impact on the number of requests to S3.
>>
>> Any other ideas how to limit the number of LIST requests are appreciated
>>
>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
>> napisał(a):
>>
>>> OK I assume that your main concern is checkpointing costs.
>>>
>>> - Caching: If your queries read the same data multiple times, caching
>>> the data might reduce the amount of data that needs to be checkpointed.
>>>
>>>
>>> - Optimize Checkpointing Frequency i.e
>>>
>>>- Consider Changelog Checkpointing with RocksDB.  This can
>>>potentially reduce checkpoint size and duration by only storing state
>>>changes, rather than the entire state.
>>>- Adjust Trigger Interval (if possible): While not 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Mich Talebzadeh
Hi,

 With regard to your point

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

Let us visit this

Caching purpose in Structured Streaming is to store frequently accessed
data in memory or disk for faster retrieval, reducing repeated reads from
sources.

- Types:

   - Memory Caching: Stores data in memory for extremely fast access.
   - Disk Caching: Stores data on disk for larger datasets or persistence
   across triggers


- Scenarios:

Joining Streaming Data with Static Data: Cache static datasets
(e.g., reference tables) to avoid repeated reads for each micro-batch.

   -
   - Reusing Intermediate Results: Cache intermediate dataframes that are
   expensive to compute and used multiple times within the query.
   - Window Operations: Cache data within a window to avoid re-reading for
   subsequent aggregations or calculations within that window.

- Benefits:

   - Performance: Faster query execution by reducing I/O operations and
   computation overhead.
   - Cost Optimization: Reduced reads from external sources can lower
   costs, especially for cloud-based sources.
   - Scalability: Helps handle higher data volumes and throughput by
   minimizing expensive re-computations.


Example codec

scenario 1

static_data = spark.read.load("path/to/static/data") static_data.cache()
streaming_data = spark.readStream.format("...").load() joined_data =
streaming_data.join(static_data, ...) # Static data is cached for efficient
joins

scenario 2

intermediate_df = streaming_data.groupBy(...).count()
intermediate_df.cache()
# Use cached intermediate_df for further transformations or actions

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:

> Thank you very much for your suggestions. Yes, my main concern is
> checkpointing costs.
>
> I went through your suggestions and here're my comments:
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> - Optimize Checkpointing Frequency: I'm already using changelog
> checkpointing with RocksDB and increased trigger interval to a maximum
> acceptable value.
>
> - Minimize LIST Request: That's where I can get most savings. My LIST
> requests account for ~70% of checkpointing costs. From what I see, LIST
> requests are ~2.5x the number of PUT requests. Unfortunately, when I
> changed to checkpoting location DBFS, it didn't help with minimizing LIST
> requests. They are roughly at the same level. From what I see, S3 Optimized
> Committer is EMR-specific so I can't use it in Databricks. The fact that I
> don't see a difference between S3 and DBFS checkpoint location suggests
> that both must implement the same or similar committer.
>
> - Optimizing RocksDB: I still need to do this but I don't suspect it will
> help much. From what I understand, these settings shouldn't have a
> significant impact on the number of requests to S3.
>
> Any other ideas how to limit the number of LIST requests are appreciated
>
> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
> napisał(a):
>
>> OK I assume that your main concern is checkpointing costs.
>>
>> - Caching: If your queries read the same data multiple times, caching
>> the data might reduce the amount of data that needs to be checkpointed.
>>
>> - Optimize Checkpointing Frequency i.e
>>
>>- Consider Changelog Checkpointing with RocksDB.  This can
>>potentially reduce checkpoint size and duration by only storing state
>>changes, rather than the entire state.
>>- Adjust Trigger Interval (if possible): While not ideal for your
>>near-real time requirement, even a slight increase in the trigger interval
>>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>
>> - Minimize LIST Requests:
>>
>>- Enable S3 Optimized Committer: or as you stated consider DBFS
>>
>> You can also optimise RocksDB. Set your state backend to RocksDB, if not
>> already. Here are what I use
>>
>>   # Add RocksDB configurations here
>> 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Thank you very much for your suggestions. Yes, my main concern is
checkpointing costs.

I went through your suggestions and here're my comments:

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

- Optimize Checkpointing Frequency: I'm already using changelog
checkpointing with RocksDB and increased trigger interval to a maximum
acceptable value.

- Minimize LIST Request: That's where I can get most savings. My LIST
requests account for ~70% of checkpointing costs. From what I see, LIST
requests are ~2.5x the number of PUT requests. Unfortunately, when I
changed to checkpoting location DBFS, it didn't help with minimizing LIST
requests. They are roughly at the same level. From what I see, S3 Optimized
Committer is EMR-specific so I can't use it in Databricks. The fact that I
don't see a difference between S3 and DBFS checkpoint location suggests
that both must implement the same or similar committer.

- Optimizing RocksDB: I still need to do this but I don't suspect it will
help much. From what I understand, these settings shouldn't have a
significant impact on the number of requests to S3.

Any other ideas how to limit the number of LIST requests are appreciated

niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
napisał(a):

> OK I assume that your main concern is checkpointing costs.
>
> - Caching: If your queries read the same data multiple times, caching the
> data might reduce the amount of data that needs to be checkpointed.
>
> - Optimize Checkpointing Frequency i.e
>
>- Consider Changelog Checkpointing with RocksDB.  This can
>potentially reduce checkpoint size and duration by only storing state
>changes, rather than the entire state.
>- Adjust Trigger Interval (if possible): While not ideal for your
>near-real time requirement, even a slight increase in the trigger interval
>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>
> - Minimize LIST Requests:
>
>- Enable S3 Optimized Committer: or as you stated consider DBFS
>
> You can also optimise RocksDB. Set your state backend to RocksDB, if not
> already. Here are what I use
>
>   # Add RocksDB configurations here
> spark.conf.set("spark.sql.streaming.stateStore.providerClass",
> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
> "true")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
> "64")  # Example configuration
>
>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
> "level")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
> "67108864")
>
> These configurations provide a starting point for tuning RocksDB.
> Depending on your specific use case and requirements, of course, your
> mileage varies.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera  wrote:
>
>> Usually one or two topics per query. Each query has its own checkpoint
>> directory. Each topic has a few partitions.
>>
>> Performance-wise I don't experience any bottlenecks in terms of
>> checkpointing. It's all about the number of requests (including a high
>> number of LIST requests) and the associated cost.
>>
>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
>> napisał(a):
>>
>>> How many topics and checkpoint directories are you dealing with?
>>>
>>> Does each topic has its own checkpoint  on S3?
>>>
>>> All these checkpoints are sequential writes so even SSD would not really
>>> help
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such 

unsubscribe

2024-01-10 Thread Daniel Maangi



[apache-spark] documentation on File Metadata _metadata struct

2024-01-10 Thread Jason Horner
All, the only documentation about the File Metadata ( hidden_metadata struct) I can seem to find is on the databricks website https://docs.databricks.com/en/ingestion/file-metadata-column.html#file-metadata-column for reference here is the struct:_metadata: struct (nullable = false) |-- file_path: string (nullable = false) |-- file_name: string (nullable = false) |-- file_size: long (nullable = false) |-- file_block_start: long (nullable = false) |-- file_block_length: long (nullable = false) |-- file_modification_time: timestamp (nullable = false)  As far as I can tell this feature was released as part of spark 3.20 based on this stack overflow post https://stackoverflow.com/questions/62846669/can-i-get-metadata-of-files-reading-by-spark/77238087#77238087 unfortunately I wasn’t able to locate this in the release notes. Though I may have missed it somehow. So I have  the following questions and seeking guidance from the list at how to best approach this Is the documentation “missing” from the spark 3.20 site or am I just unable to find it:While it provides the file_modification_time, there doesn’t seem to be a corresponding file_creation_time Would both of these be issues that should be opened in JIRA?  Both of these seem like simple and useful things to add but are above my ability to submit PR’s for without some guidance. I’m happy to help especially with a documentation PR’ if someone can confirm and get me started in the right direction. I don’t really have the java / scala skills needed to implement the feature.  Thanks for any pointers  

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org