Sounds good, thanks. On Sat, Aug 30, 2025 at 4:33 AM Pedro Miguel Duarte <[email protected]> wrote:
> Hi Michael, thanks for your reply! Yes, I'm still planning to make the > change. I'll open up a PR against the 3.5 branch and will tag you all on > it. Will use the oldestZipFileTime time, agree with you all on that > approach. > > Best, > Pedro > > On Sat, Aug 30, 2025 at 2:04 AM B. Micheal Okutubo < > [email protected]> wrote: > >> Hi Pedro, >> >> Sorry just catching up on this. You mentioned this happened on 636.zip >> but you said the zip file before this (N-1 in your example) has no SST >> files that is why. Was the db empty at that N-1 version? Because if it >> isn't empty at that version, then it should have SSTs associated. >> >> Anyway, +1 to Siying's suggestion. Instead of using padding, let's just >> use min(oldestTrackedFileTime, oldestZipFileTime) as the threshold. >> >> Are you still planning to make the change? I'm happy to help review. >> Thanks. >> >> >> >> On Mon, Aug 25, 2025 at 12:57 PM Mich Talebzadeh < >> [email protected]> wrote: >> >>> Hi Pedro, >>> >>> >>> Hi Pedro, >>> >>> Glad it helped >>> A couple of quick hints while you implement: >>> >>> 1) Configurable padding + N manifests >>> >>> - Add two knobs (defaults shown): >>> >>> >>> - stateStore.rocksdb.gc.paddingMs = 120000 (HDFS: 60–120s; S3/GCS: >>> 120–300s) >>> - stateStore.rocksdb.gc.protectedVersions = 3 (union of last N >>> manifests) >>> >>> - Only delete candidates >>> >>> >>> - if:mtime(candidate) + paddingMs < min(mtime(referenced)) (or < >>> now - paddingMs) >>> >>> 2) Final recheck before delete >>> >>> - Just before deletion, re-read the latest V..V-(N-1) manifests and >>> drop any candidate that appears there. This will close the race >>> >>> HTH >>> >>> Dr Mich Talebzadeh, >>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> >>> >>> On Mon, 25 Aug 2025 at 19:29, Pedro Miguel Duarte <[email protected]> >>> wrote: >>> >>>> Thanks for your reply! >>>> >>>> Yes this helps. I think adding a time padding will help prevent >>>> deleting files that are incorrectly labeled as orphaned in the current >>>> implementation. This only happens if two executors run maintenance at >>>> nearly the exact same time. I'll look into implementing a fix. >>>> >>>> On Mon, Aug 25, 2025 at 12:46 PM Mich Talebzadeh < >>>> [email protected]> wrote: >>>> >>>>> In your statement >>>>> >>>>> "*Instead of simply older, should there be some padding to allow for >>>>> maintenance being executed simultaneously on two executors? Something >>>>> like >>>>> at least 60s older than the oldest tracked file."* >>>>> *What you need to do is to add a time padding before deleting orphans >>>>> which is a good solution for concurrent maintenance* >>>>> >>>>> For your RocksDB state-store race, still apply the safety measures: >>>>> >>>>> - Add a time padding window before deleting “orphans” (e.g., >>>>> 60–120s; padding is cheap insurance). >>>>> - Consider the union of last N manifests (N≥2–3) when deciding >>>>> deletions. When deciding which .sst files are “orphans” (i.e.safe to >>>>> delete), don’t look only at the latest snapshot/manifest (e.g., V.zip). >>>>> Instead, build a protected set of files by taking the union of the >>>>> files >>>>> referenced by the last N manifests (e.g., versions V, V-1, …, V-(N-1)), >>>>> with N ≥ 2 (and often 3 on object stores). Only delete files not in >>>>> that >>>>> union. This helps as it re reads the latest before delete (final sanity >>>>> check). >>>>> >>>>> HTH >>>>> >>>>> Dr Mich Talebzadeh, >>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, 18 Aug 2025 at 17:42, Pedro Miguel Duarte <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello structured streaming experts! >>>>>> >>>>>> We are getting SST FileNotFound state store corruption errors. >>>>>> >>>>>> The root cause is a race condition where two different executors are >>>>>> doing cleanup of the state store at the same time. Both write the >>>>>> version >>>>>> of the state zip file to DFS. >>>>>> >>>>>> The first executor enters maintenance, writes SST files and writes >>>>>> the 636.zip file. >>>>>> >>>>>> Concurrently the second executor enters maintenance, writes SST files >>>>>> and writes 636.zip. >>>>>> >>>>>> The SST files in dfs are written almost simultaneously and are: >>>>>> - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst <-- from one >>>>>> executor >>>>>> - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst <-- from other >>>>>> executor >>>>>> >>>>>> The problem occurs during orphan file deletion (see this PR >>>>>> <https://github.com/apache/spark/pull/39897/files>). The executor >>>>>> that lost the race to write 636.zip decides that it will delete the SST >>>>>> file that is actually referenced in 636.zip. >>>>>> >>>>>> Currently the maintenance task does have some protection for files of >>>>>> ongoing tasks. This is from the comments in RocksDBFileManager: "only >>>>>> delete orphan files that are older than all tracked files when there are >>>>>> at >>>>>> least 2 versions". >>>>>> >>>>>> In this case the file that is being deleted is indeed older but only >>>>>> by a small amount of time. >>>>>> >>>>>> Instead of simply older, should there be some padding to allow for >>>>>> maintenance being executed simultaneously on two executors? Something >>>>>> like >>>>>> at least 60s older than the oldest tracked file. >>>>>> >>>>>> This should help avoid state store corruption at the expense of some >>>>>> storage space in the DFS. >>>>>> >>>>>> Thanks for any help or recommendations here. >>>>>> >>>>>
