Hey, Yes, that's how I understood it (scenario 1). However, I'm not sure if scenario 2 is possible. I think cache on streaming DataFrame is supported only in forEachBatch (in which it's actually no longer a streaming DF).
śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com> napisał(a): > Hi, > > With regard to your point > > - Caching: Can you please explain what you mean by caching? I know that > when you have batch and streaming sources in a streaming query, then you > can try to cache batch ones to save on reads. But I'm not sure if it's what > you mean, and I don't know how to apply what you suggest to streaming data. > > Let us visit this > > Caching purpose in Structured Streaming is to store frequently accessed > data in memory or disk for faster retrieval, reducing repeated reads from > sources. > > - Types: > > - Memory Caching: Stores data in memory for extremely fast access. > - Disk Caching: Stores data on disk for larger datasets or persistence > across triggers > > > - Scenarios: > > Joining Streaming Data with Static Data: Cache static datasets > (e.g., reference tables) to avoid repeated reads for each micro-batch. > > - > - Reusing Intermediate Results: Cache intermediate dataframes that are > expensive to compute and used multiple times within the query. > - Window Operations: Cache data within a window to avoid re-reading > for subsequent aggregations or calculations within that window. > > - Benefits: > > - Performance: Faster query execution by reducing I/O operations and > computation overhead. > - Cost Optimization: Reduced reads from external sources can lower > costs, especially for cloud-based sources. > - Scalability: Helps handle higher data volumes and throughput by > minimizing expensive re-computations. > > > Example codec > > scenario 1 > > static_data = spark.read.load("path/to/static/data") static_data.cache() > streaming_data = spark.readStream.format("...").load() joined_data = > streaming_data.join(static_data, ...) # Static data is cached for > efficient joins > > scenario 2 > > intermediate_df = streaming_data.groupBy(...).count() > intermediate_df.cache() > # Use cached intermediate_df for further transformations or actions > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com> wrote: > >> Thank you very much for your suggestions. Yes, my main concern is >> checkpointing costs. >> >> I went through your suggestions and here're my comments: >> >> - Caching: Can you please explain what you mean by caching? I know that >> when you have batch and streaming sources in a streaming query, then you >> can try to cache batch ones to save on reads. But I'm not sure if it's what >> you mean, and I don't know how to apply what you suggest to streaming data. >> >> - Optimize Checkpointing Frequency: I'm already using changelog >> checkpointing with RocksDB and increased trigger interval to a maximum >> acceptable value. >> >> - Minimize LIST Request: That's where I can get most savings. My LIST >> requests account for ~70% of checkpointing costs. From what I see, LIST >> requests are ~2.5x the number of PUT requests. Unfortunately, when I >> changed to checkpoting location DBFS, it didn't help with minimizing LIST >> requests. They are roughly at the same level. From what I see, S3 Optimized >> Committer is EMR-specific so I can't use it in Databricks. The fact that I >> don't see a difference between S3 and DBFS checkpoint location suggests >> that both must implement the same or similar committer. >> >> - Optimizing RocksDB: I still need to do this but I don't suspect it will >> help much. From what I understand, these settings shouldn't have a >> significant impact on the number of requests to S3. >> >> Any other ideas how to limit the number of LIST requests are appreciated >> >> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com> >> napisał(a): >> >>> OK I assume that your main concern is checkpointing costs. >>> >>> - Caching: If your queries read the same data multiple times, caching >>> the data might reduce the amount of data that needs to be checkpointed. >>> >>> >>> - Optimize Checkpointing Frequency i.e >>> >>> - Consider Changelog Checkpointing with RocksDB. This can >>> potentially reduce checkpoint size and duration by only storing state >>> changes, rather than the entire state. >>> - Adjust Trigger Interval (if possible): While not ideal for your >>> near-real time requirement, even a slight increase in the trigger >>> interval >>> (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs. >>> >>> - Minimize LIST Requests: >>> >>> - Enable S3 Optimized Committer: or as you stated consider DBFS >>> >>> You can also optimise RocksDB. Set your state backend to RocksDB, if >>> not already. Here are what I use >>> >>> # Add RocksDB configurations here >>> spark.conf.set("spark.sql.streaming.stateStore.providerClass", >>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") >>> >>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true") >>> >>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", >>> "64") # Example configuration >>> >>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style", >>> "level") >>> >>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase", >>> "67108864") >>> >>> These configurations provide a starting point for tuning RocksDB. >>> Depending on your specific use case and requirements, of course, your >>> mileage varies. >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com> wrote: >>> >>>> Usually one or two topics per query. Each query has its own checkpoint >>>> directory. Each topic has a few partitions. >>>> >>>> Performance-wise I don't experience any bottlenecks in terms of >>>> checkpointing. It's all about the number of requests (including a high >>>> number of LIST requests) and the associated cost. >>>> >>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com> >>>> napisał(a): >>>> >>>>> How many topics and checkpoint directories are you dealing with? >>>>> >>>>> Does each topic has its own checkpoint on S3? >>>>> >>>>> All these checkpoints are sequential writes so even SSD would not >>>>> really help >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that >>>>>> require near-real time accuracy with trigger intervals in the level of >>>>>> 5-10 >>>>>> seconds. I usually run 3-6 streaming queries as part of the job and each >>>>>> query includes at least one stateful operation (and usually two or more). >>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store. >>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost >>>>>> item >>>>>> of the system and it's roughly 4-5 times the cost of compute. >>>>>> >>>>>> To save on compute costs, the following things are usually >>>>>> recommended: >>>>>> >>>>>> - increase trigger interval (as mentioned, I don't have much >>>>>> space here) >>>>>> - decrease the number of shuffle partitions (I have 2x the number >>>>>> of workers) >>>>>> >>>>>> I'm looking for some other recommendations that I can use to save on >>>>>> checkpointing costs. I saw that most requests are LIST requests. Can we >>>>>> cut >>>>>> them down somehow? I'm using Databricks. If I replace S3 bucket with >>>>>> DBFS, >>>>>> will it help in any way? >>>>>> >>>>>> Thank you! >>>>>> Andrzej >>>>>> >>>>>>