Sounds good, thanks.

On Sat, Aug 30, 2025 at 4:33 AM Pedro Miguel Duarte <[email protected]>
wrote:

> Hi Michael, thanks for your reply! Yes, I'm still planning to make the
> change. I'll open up a PR against the 3.5 branch and will tag you all on
> it. Will use the oldestZipFileTime time, agree with you all on that
> approach.
>
> Best,
> Pedro
>
> On Sat, Aug 30, 2025 at 2:04 AM B. Micheal Okutubo <
> [email protected]> wrote:
>
>> Hi Pedro,
>>
>> Sorry just catching up on this. You mentioned this happened on 636.zip
>> but you said the zip file before this (N-1 in your example) has no SST
>> files that is why. Was the db empty at that N-1 version? Because if it
>> isn't empty at that version, then it should have SSTs associated.
>>
>> Anyway, +1 to Siying's suggestion. Instead of using padding, let's just
>> use min(oldestTrackedFileTime, oldestZipFileTime) as the threshold.
>>
>> Are you still planning to make the change? I'm happy to help review.
>> Thanks.
>>
>>
>>
>> On Mon, Aug 25, 2025 at 12:57 PM Mich Talebzadeh <
>> [email protected]> wrote:
>>
>>> Hi Pedro,
>>>
>>>
>>> Hi Pedro,
>>>
>>> Glad it helped
>>> A couple of quick hints while you implement:
>>>
>>> 1) Configurable padding + N manifests
>>>
>>> - Add two knobs (defaults shown):
>>>
>>>
>>>    - stateStore.rocksdb.gc.paddingMs = 120000 (HDFS: 60–120s; S3/GCS:
>>>    120–300s)
>>>    - stateStore.rocksdb.gc.protectedVersions = 3 (union of last N
>>>    manifests)
>>>
>>> - Only delete candidates
>>>
>>>
>>>    -  if:mtime(candidate) + paddingMs < min(mtime(referenced)) (or <
>>>    now - paddingMs)
>>>
>>> 2) Final recheck before delete
>>>
>>>    - Just before deletion, re-read the latest V..V-(N-1) manifests and
>>>    drop any candidate that appears there. This will close the race
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh,
>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, 25 Aug 2025 at 19:29, Pedro Miguel Duarte <[email protected]>
>>> wrote:
>>>
>>>> Thanks for your reply!
>>>>
>>>> Yes this helps. I think adding a time padding will help prevent
>>>> deleting files that are incorrectly labeled as orphaned in the current
>>>> implementation. This only happens if two executors run maintenance at
>>>> nearly the exact same time. I'll look into implementing a fix.
>>>>
>>>> On Mon, Aug 25, 2025 at 12:46 PM Mich Talebzadeh <
>>>> [email protected]> wrote:
>>>>
>>>>> In your statement
>>>>>
>>>>> "*Instead of simply older, should there be some padding to allow for
>>>>> maintenance being executed simultaneously on two executors?  Something 
>>>>> like
>>>>> at least 60s older than the oldest tracked file."*
>>>>> *What you need to do is to add a time padding before deleting orphans
>>>>> which is a good solution for concurrent maintenance*
>>>>>
>>>>> For your RocksDB state-store race, still apply the safety measures:
>>>>>
>>>>>    - Add a time padding window before deleting “orphans” (e.g.,
>>>>>    60–120s; padding is cheap insurance).
>>>>>    - Consider the union of last N manifests (N≥2–3) when deciding
>>>>>    deletions. When deciding which .sst files are “orphans” (i.e.safe to
>>>>>    delete), don’t look only at the latest snapshot/manifest (e.g., V.zip).
>>>>>    Instead, build a protected set of files by taking the union of the 
>>>>> files
>>>>>    referenced by the last N manifests (e.g., versions V, V-1, …, V-(N-1)),
>>>>>    with N ≥ 2 (and often 3 on object stores). Only delete files not in 
>>>>> that
>>>>>    union. This helps as it re reads the latest before delete (final sanity
>>>>>    check).
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh,
>>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 18 Aug 2025 at 17:42, Pedro Miguel Duarte <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello structured streaming experts!
>>>>>>
>>>>>> We are getting SST FileNotFound state store corruption errors.
>>>>>>
>>>>>> The root cause is a race condition where two different executors are
>>>>>> doing cleanup of the state store at the same time.  Both write the 
>>>>>> version
>>>>>> of the state zip file to DFS.
>>>>>>
>>>>>> The first executor enters maintenance, writes SST files and writes
>>>>>> the 636.zip file.
>>>>>>
>>>>>> Concurrently the second executor enters maintenance, writes SST files
>>>>>> and writes 636.zip.
>>>>>>
>>>>>> The SST files in dfs are written almost simultaneously and are:
>>>>>>    - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst   <-- from one
>>>>>> executor
>>>>>>    - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst   <-- from other
>>>>>> executor
>>>>>>
>>>>>> The problem occurs during orphan file deletion (see this PR
>>>>>> <https://github.com/apache/spark/pull/39897/files>). The executor
>>>>>> that lost the race to write 636.zip decides that it will delete the SST
>>>>>> file that is actually referenced in 636.zip.
>>>>>>
>>>>>> Currently the maintenance task does have some protection for files of
>>>>>> ongoing tasks. This is from the comments in RocksDBFileManager: "only
>>>>>> delete orphan files that are older than all tracked files when there are 
>>>>>> at
>>>>>> least 2 versions".
>>>>>>
>>>>>> In this case the file that is being deleted is indeed older but only
>>>>>> by a small amount of time.
>>>>>>
>>>>>> Instead of simply older, should there be some padding to allow for
>>>>>> maintenance being executed simultaneously on two executors?  Something 
>>>>>> like
>>>>>> at least 60s older than the oldest tracked file.
>>>>>>
>>>>>> This should help avoid state store corruption at the expense of some
>>>>>> storage space in the DFS.
>>>>>>
>>>>>> Thanks for any help or recommendations here.
>>>>>>
>>>>>

Reply via email to