Hello Micheal and Siying. I have opened up a WIP pr with the proposed
changes here: https://github.com/apache/spark/pull/52209

The first commit adds a test that exposes the issue and the other two
commits are the changes to fix it.

I'm not sure how to go about creating a SPARK jira issue to include in the
PR description. Appreciate any guidance on that.

Thanks!
Pedro

On Tue, Sep 2, 2025 at 6:46 PM B. Micheal Okutubo <
[email protected]> wrote:

> Sounds good, thanks.
>
> On Sat, Aug 30, 2025 at 4:33 AM Pedro Miguel Duarte <[email protected]>
> wrote:
>
>> Hi Michael, thanks for your reply! Yes, I'm still planning to make the
>> change. I'll open up a PR against the 3.5 branch and will tag you all on
>> it. Will use the oldestZipFileTime time, agree with you all on that
>> approach.
>>
>> Best,
>> Pedro
>>
>> On Sat, Aug 30, 2025 at 2:04 AM B. Micheal Okutubo <
>> [email protected]> wrote:
>>
>>> Hi Pedro,
>>>
>>> Sorry just catching up on this. You mentioned this happened on 636.zip
>>> but you said the zip file before this (N-1 in your example) has no SST
>>> files that is why. Was the db empty at that N-1 version? Because if it
>>> isn't empty at that version, then it should have SSTs associated.
>>>
>>> Anyway, +1 to Siying's suggestion. Instead of using padding, let's just
>>> use min(oldestTrackedFileTime, oldestZipFileTime) as the threshold.
>>>
>>> Are you still planning to make the change? I'm happy to help review.
>>> Thanks.
>>>
>>>
>>>
>>> On Mon, Aug 25, 2025 at 12:57 PM Mich Talebzadeh <
>>> [email protected]> wrote:
>>>
>>>> Hi Pedro,
>>>>
>>>>
>>>> Hi Pedro,
>>>>
>>>> Glad it helped
>>>> A couple of quick hints while you implement:
>>>>
>>>> 1) Configurable padding + N manifests
>>>>
>>>> - Add two knobs (defaults shown):
>>>>
>>>>
>>>>    - stateStore.rocksdb.gc.paddingMs = 120000 (HDFS: 60–120s; S3/GCS:
>>>>    120–300s)
>>>>    - stateStore.rocksdb.gc.protectedVersions = 3 (union of last N
>>>>    manifests)
>>>>
>>>> - Only delete candidates
>>>>
>>>>
>>>>    -  if:mtime(candidate) + paddingMs < min(mtime(referenced)) (or <
>>>>    now - paddingMs)
>>>>
>>>> 2) Final recheck before delete
>>>>
>>>>    - Just before deletion, re-read the latest V..V-(N-1) manifests and
>>>>    drop any candidate that appears there. This will close the race
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh,
>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 25 Aug 2025 at 19:29, Pedro Miguel Duarte <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks for your reply!
>>>>>
>>>>> Yes this helps. I think adding a time padding will help prevent
>>>>> deleting files that are incorrectly labeled as orphaned in the current
>>>>> implementation. This only happens if two executors run maintenance at
>>>>> nearly the exact same time. I'll look into implementing a fix.
>>>>>
>>>>> On Mon, Aug 25, 2025 at 12:46 PM Mich Talebzadeh <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> In your statement
>>>>>>
>>>>>> "*Instead of simply older, should there be some padding to allow for
>>>>>> maintenance being executed simultaneously on two executors?  Something 
>>>>>> like
>>>>>> at least 60s older than the oldest tracked file."*
>>>>>> *What you need to do is to add a time padding before deleting orphans
>>>>>> which is a good solution for concurrent maintenance*
>>>>>>
>>>>>> For your RocksDB state-store race, still apply the safety measures:
>>>>>>
>>>>>>    - Add a time padding window before deleting “orphans” (e.g.,
>>>>>>    60–120s; padding is cheap insurance).
>>>>>>    - Consider the union of last N manifests (N≥2–3) when deciding
>>>>>>    deletions. When deciding which .sst files are “orphans” (i.e.safe to
>>>>>>    delete), don’t look only at the latest snapshot/manifest (e.g., 
>>>>>> V.zip).
>>>>>>    Instead, build a protected set of files by taking the union of the 
>>>>>> files
>>>>>>    referenced by the last N manifests (e.g., versions V, V-1, …, 
>>>>>> V-(N-1)),
>>>>>>    with N ≥ 2 (and often 3 on object stores). Only delete files not in 
>>>>>> that
>>>>>>    union. This helps as it re reads the latest before delete (final 
>>>>>> sanity
>>>>>>    check).
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Dr Mich Talebzadeh,
>>>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, 18 Aug 2025 at 17:42, Pedro Miguel Duarte <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello structured streaming experts!
>>>>>>>
>>>>>>> We are getting SST FileNotFound state store corruption errors.
>>>>>>>
>>>>>>> The root cause is a race condition where two different executors are
>>>>>>> doing cleanup of the state store at the same time.  Both write the 
>>>>>>> version
>>>>>>> of the state zip file to DFS.
>>>>>>>
>>>>>>> The first executor enters maintenance, writes SST files and writes
>>>>>>> the 636.zip file.
>>>>>>>
>>>>>>> Concurrently the second executor enters maintenance, writes SST
>>>>>>> files and writes 636.zip.
>>>>>>>
>>>>>>> The SST files in dfs are written almost simultaneously and are:
>>>>>>>    - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst   <-- from one
>>>>>>> executor
>>>>>>>    - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst   <-- from
>>>>>>> other executor
>>>>>>>
>>>>>>> The problem occurs during orphan file deletion (see this PR
>>>>>>> <https://github.com/apache/spark/pull/39897/files>). The executor
>>>>>>> that lost the race to write 636.zip decides that it will delete the SST
>>>>>>> file that is actually referenced in 636.zip.
>>>>>>>
>>>>>>> Currently the maintenance task does have some protection for files
>>>>>>> of ongoing tasks. This is from the comments in RocksDBFileManager: "only
>>>>>>> delete orphan files that are older than all tracked files when there 
>>>>>>> are at
>>>>>>> least 2 versions".
>>>>>>>
>>>>>>> In this case the file that is being deleted is indeed older but only
>>>>>>> by a small amount of time.
>>>>>>>
>>>>>>> Instead of simply older, should there be some padding to allow for
>>>>>>> maintenance being executed simultaneously on two executors?  Something 
>>>>>>> like
>>>>>>> at least 60s older than the oldest tracked file.
>>>>>>>
>>>>>>> This should help avoid state store corruption at the expense of some
>>>>>>> storage space in the DFS.
>>>>>>>
>>>>>>> Thanks for any help or recommendations here.
>>>>>>>
>>>>>>

Reply via email to