Hello Micheal and Siying. I have opened up a WIP pr with the proposed changes here: https://github.com/apache/spark/pull/52209
The first commit adds a test that exposes the issue and the other two commits are the changes to fix it. I'm not sure how to go about creating a SPARK jira issue to include in the PR description. Appreciate any guidance on that. Thanks! Pedro On Tue, Sep 2, 2025 at 6:46 PM B. Micheal Okutubo < [email protected]> wrote: > Sounds good, thanks. > > On Sat, Aug 30, 2025 at 4:33 AM Pedro Miguel Duarte <[email protected]> > wrote: > >> Hi Michael, thanks for your reply! Yes, I'm still planning to make the >> change. I'll open up a PR against the 3.5 branch and will tag you all on >> it. Will use the oldestZipFileTime time, agree with you all on that >> approach. >> >> Best, >> Pedro >> >> On Sat, Aug 30, 2025 at 2:04 AM B. Micheal Okutubo < >> [email protected]> wrote: >> >>> Hi Pedro, >>> >>> Sorry just catching up on this. You mentioned this happened on 636.zip >>> but you said the zip file before this (N-1 in your example) has no SST >>> files that is why. Was the db empty at that N-1 version? Because if it >>> isn't empty at that version, then it should have SSTs associated. >>> >>> Anyway, +1 to Siying's suggestion. Instead of using padding, let's just >>> use min(oldestTrackedFileTime, oldestZipFileTime) as the threshold. >>> >>> Are you still planning to make the change? I'm happy to help review. >>> Thanks. >>> >>> >>> >>> On Mon, Aug 25, 2025 at 12:57 PM Mich Talebzadeh < >>> [email protected]> wrote: >>> >>>> Hi Pedro, >>>> >>>> >>>> Hi Pedro, >>>> >>>> Glad it helped >>>> A couple of quick hints while you implement: >>>> >>>> 1) Configurable padding + N manifests >>>> >>>> - Add two knobs (defaults shown): >>>> >>>> >>>> - stateStore.rocksdb.gc.paddingMs = 120000 (HDFS: 60–120s; S3/GCS: >>>> 120–300s) >>>> - stateStore.rocksdb.gc.protectedVersions = 3 (union of last N >>>> manifests) >>>> >>>> - Only delete candidates >>>> >>>> >>>> - if:mtime(candidate) + paddingMs < min(mtime(referenced)) (or < >>>> now - paddingMs) >>>> >>>> 2) Final recheck before delete >>>> >>>> - Just before deletion, re-read the latest V..V-(N-1) manifests and >>>> drop any candidate that appears there. This will close the race >>>> >>>> HTH >>>> >>>> Dr Mich Talebzadeh, >>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, 25 Aug 2025 at 19:29, Pedro Miguel Duarte <[email protected]> >>>> wrote: >>>> >>>>> Thanks for your reply! >>>>> >>>>> Yes this helps. I think adding a time padding will help prevent >>>>> deleting files that are incorrectly labeled as orphaned in the current >>>>> implementation. This only happens if two executors run maintenance at >>>>> nearly the exact same time. I'll look into implementing a fix. >>>>> >>>>> On Mon, Aug 25, 2025 at 12:46 PM Mich Talebzadeh < >>>>> [email protected]> wrote: >>>>> >>>>>> In your statement >>>>>> >>>>>> "*Instead of simply older, should there be some padding to allow for >>>>>> maintenance being executed simultaneously on two executors? Something >>>>>> like >>>>>> at least 60s older than the oldest tracked file."* >>>>>> *What you need to do is to add a time padding before deleting orphans >>>>>> which is a good solution for concurrent maintenance* >>>>>> >>>>>> For your RocksDB state-store race, still apply the safety measures: >>>>>> >>>>>> - Add a time padding window before deleting “orphans” (e.g., >>>>>> 60–120s; padding is cheap insurance). >>>>>> - Consider the union of last N manifests (N≥2–3) when deciding >>>>>> deletions. When deciding which .sst files are “orphans” (i.e.safe to >>>>>> delete), don’t look only at the latest snapshot/manifest (e.g., >>>>>> V.zip). >>>>>> Instead, build a protected set of files by taking the union of the >>>>>> files >>>>>> referenced by the last N manifests (e.g., versions V, V-1, …, >>>>>> V-(N-1)), >>>>>> with N ≥ 2 (and often 3 on object stores). Only delete files not in >>>>>> that >>>>>> union. This helps as it re reads the latest before delete (final >>>>>> sanity >>>>>> check). >>>>>> >>>>>> HTH >>>>>> >>>>>> Dr Mich Talebzadeh, >>>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, 18 Aug 2025 at 17:42, Pedro Miguel Duarte <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hello structured streaming experts! >>>>>>> >>>>>>> We are getting SST FileNotFound state store corruption errors. >>>>>>> >>>>>>> The root cause is a race condition where two different executors are >>>>>>> doing cleanup of the state store at the same time. Both write the >>>>>>> version >>>>>>> of the state zip file to DFS. >>>>>>> >>>>>>> The first executor enters maintenance, writes SST files and writes >>>>>>> the 636.zip file. >>>>>>> >>>>>>> Concurrently the second executor enters maintenance, writes SST >>>>>>> files and writes 636.zip. >>>>>>> >>>>>>> The SST files in dfs are written almost simultaneously and are: >>>>>>> - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst <-- from one >>>>>>> executor >>>>>>> - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst <-- from >>>>>>> other executor >>>>>>> >>>>>>> The problem occurs during orphan file deletion (see this PR >>>>>>> <https://github.com/apache/spark/pull/39897/files>). The executor >>>>>>> that lost the race to write 636.zip decides that it will delete the SST >>>>>>> file that is actually referenced in 636.zip. >>>>>>> >>>>>>> Currently the maintenance task does have some protection for files >>>>>>> of ongoing tasks. This is from the comments in RocksDBFileManager: "only >>>>>>> delete orphan files that are older than all tracked files when there >>>>>>> are at >>>>>>> least 2 versions". >>>>>>> >>>>>>> In this case the file that is being deleted is indeed older but only >>>>>>> by a small amount of time. >>>>>>> >>>>>>> Instead of simply older, should there be some padding to allow for >>>>>>> maintenance being executed simultaneously on two executors? Something >>>>>>> like >>>>>>> at least 60s older than the oldest tracked file. >>>>>>> >>>>>>> This should help avoid state store corruption at the expense of some >>>>>>> storage space in the DFS. >>>>>>> >>>>>>> Thanks for any help or recommendations here. >>>>>>> >>>>>>
