[
https://issues.apache.org/jira/browse/SPARK-51823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved SPARK-51823.
----------------------------------
Fix Version/s: 4.1.0
Resolution: Fixed
Issue resolved by pull request 50612
[https://github.com/apache/spark/pull/50612]
> Add option to not persist state stores on executors
> ---------------------------------------------------
>
> Key: SPARK-51823
> URL: https://issues.apache.org/jira/browse/SPARK-51823
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.5.5
> Reporter: Adam Binford
> Assignee: Adam Binford
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0
>
>
> Currently when using stateful streams, the state is loaded on executors when
> needed, and then kept on the executors assuming it will be needed again soon.
> This makes sense for very low latency applications or those dealing with a
> small but frequent volume of data. But this does not work well for high
> latency, high volume applications that need to optimize resource usage.
> Originally there was only the HDFSBackedStateStore, which keeps state in
> memory. Users frequently hit OOM errors trying to use this for large queries.
> Typically in Spark, you can run a query of nearly any size, with the
> appropriate number of shuffle partitions, and it would finish eventually
> depending on the amount of compute resources you have. For stateful streams,
> all state data is kept in memory indefinitely on the executors you have, so
> your resources _had_ to scale with the data, there was no option for "less
> resources but take longer".
> The RocksDBStateStore was created to try to alleviate these issues, but it
> does not do it completely. It moves the Java heap resource requirement to
> native memory and disk, but still has the same scaling issue. In its default
> setup, it can accumulate unbounded native memory. The bounded memory setting
> was added to try to alleviate that, but it still has issues with scaling.
> Currently l0 blocks are pinned in the cache, so any RocksDB instance open on
> an executor will keeps it's l0 blocks in memory even when they are not used,
> with no option to evict it. Additionally all the RocksDB instances share a
> background threadpool that defaults to 2 threads for doing flushing and
> compaction, so any open RocksDB instance, even when not being actively used
> by an executor, can lead to resource contention with other open instances.
> The real solution to scaling stateful queries is to have the option to not
> persist state stores on executors when they are not being used. This does
> have some tradeoffs, as each batch the state will have to be reloaded from
> its source, but this is an acceptable tradeoff for a lot of use cases.
> Specifically, for streams that may have long periods between batches or
> utilize dynamic allocation, it is likely the loaded state stores will
> eventually be unloaded when the executor is deallocated before being used for
> a second time anyway. In this case keeping the state stores on the executors
> provides no benefit and just wastes resources.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]