Hey,

Yes, that's how I understood it (scenario 1). However, I'm not sure if
scenario 2 is possible. I think cache on streaming DataFrame is supported
only in forEachBatch (in which it's actually no longer a streaming DF).

śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com>
napisał(a):

> Hi,
>
>  With regard to your point
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> Let us visit this
>
> Caching purpose in Structured Streaming is to store frequently accessed
> data in memory or disk for faster retrieval, reducing repeated reads from
> sources.
>
> - Types:
>
>    - Memory Caching: Stores data in memory for extremely fast access.
>    - Disk Caching: Stores data on disk for larger datasets or persistence
>    across triggers
>
>
> - Scenarios:
>
> Joining Streaming Data with Static Data: Cache static datasets
> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>
>    -
>    - Reusing Intermediate Results: Cache intermediate dataframes that are
>    expensive to compute and used multiple times within the query.
>    - Window Operations: Cache data within a window to avoid re-reading
>    for subsequent aggregations or calculations within that window.
>
> - Benefits:
>
>    - Performance: Faster query execution by reducing I/O operations and
>    computation overhead.
>    - Cost Optimization: Reduced reads from external sources can lower
>    costs, especially for cloud-based sources.
>    - Scalability: Helps handle higher data volumes and throughput by
>    minimizing expensive re-computations.
>
>
> Example codec
>
> scenario 1
>
> static_data = spark.read.load("path/to/static/data") static_data.cache()
> streaming_data = spark.readStream.format("...").load() joined_data =
> streaming_data.join(static_data, ...) # Static data is cached for
> efficient joins
>
> scenario 2
>
> intermediate_df = streaming_data.groupBy(...).count()
> intermediate_df.cache()
> # Use cached intermediate_df for further transformations or actions
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com> wrote:
>
>> Thank you very much for your suggestions. Yes, my main concern is
>> checkpointing costs.
>>
>> I went through your suggestions and here're my comments:
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> - Optimize Checkpointing Frequency: I'm already using changelog
>> checkpointing with RocksDB and increased trigger interval to a maximum
>> acceptable value.
>>
>> - Minimize LIST Request: That's where I can get most savings. My LIST
>> requests account for ~70% of checkpointing costs. From what I see, LIST
>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>> requests. They are roughly at the same level. From what I see, S3 Optimized
>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>> don't see a difference between S3 and DBFS checkpoint location suggests
>> that both must implement the same or similar committer.
>>
>> - Optimizing RocksDB: I still need to do this but I don't suspect it will
>> help much. From what I understand, these settings shouldn't have a
>> significant impact on the number of requests to S3.
>>
>> Any other ideas how to limit the number of LIST requests are appreciated
>>
>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com>
>> napisał(a):
>>
>>> OK I assume that your main concern is checkpointing costs.
>>>
>>> - Caching: If your queries read the same data multiple times, caching
>>> the data might reduce the amount of data that needs to be checkpointed.
>>>
>>>
>>> - Optimize Checkpointing Frequency i.e
>>>
>>>    - Consider Changelog Checkpointing with RocksDB.  This can
>>>    potentially reduce checkpoint size and duration by only storing state
>>>    changes, rather than the entire state.
>>>    - Adjust Trigger Interval (if possible): While not ideal for your
>>>    near-real time requirement, even a slight increase in the trigger 
>>> interval
>>>    (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>>
>>> - Minimize LIST Requests:
>>>
>>>    - Enable S3 Optimized Committer: or as you stated consider DBFS
>>>
>>> You can also optimise RocksDB. Set your state backend to RocksDB, if
>>> not already. Here are what I use
>>>
>>>               # Add RocksDB configurations here
>>>         spark.conf.set("spark.sql.streaming.stateStore.providerClass",
>>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
>>>
>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true")
>>>
>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
>>> "64")  # Example configuration
>>>
>>>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
>>> "level")
>>>
>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
>>> "67108864")
>>>
>>> These configurations provide a starting point for tuning RocksDB.
>>> Depending on your specific use case and requirements, of course, your
>>> mileage varies.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com> wrote:
>>>
>>>> Usually one or two topics per query. Each query has its own checkpoint
>>>> directory. Each topic has a few partitions.
>>>>
>>>> Performance-wise I don't experience any bottlenecks in terms of
>>>> checkpointing. It's all about the number of requests (including a high
>>>> number of LIST requests) and the associated cost.
>>>>
>>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> How many topics and checkpoint directories are you dealing with?
>>>>>
>>>>> Does each topic has its own checkpoint  on S3?
>>>>>
>>>>> All these checkpoints are sequential writes so even SSD would not
>>>>> really help
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>>>>>> require near-real time accuracy with trigger intervals in the level of 
>>>>>> 5-10
>>>>>> seconds. I usually run 3-6 streaming queries as part of the job and each
>>>>>> query includes at least one stateful operation (and usually two or more).
>>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost 
>>>>>> item
>>>>>> of the system and it's roughly 4-5 times the cost of compute.
>>>>>>
>>>>>> To save on compute costs, the following things are usually
>>>>>> recommended:
>>>>>>
>>>>>>    - increase trigger interval (as mentioned, I don't have much
>>>>>>    space here)
>>>>>>    - decrease the number of shuffle partitions (I have 2x the number
>>>>>>    of workers)
>>>>>>
>>>>>> I'm looking for some other recommendations that I can use to save on
>>>>>> checkpointing costs. I saw that most requests are LIST requests. Can we 
>>>>>> cut
>>>>>> them down somehow? I'm using Databricks. If I replace S3 bucket with 
>>>>>> DBFS,
>>>>>> will it help in any way?
>>>>>>
>>>>>> Thank you!
>>>>>> Andrzej
>>>>>>
>>>>>>

Reply via email to