[ 
https://issues.apache.org/jira/browse/FLINK-40037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093245#comment-18093245
 ] 

Kartikey Pant commented on FLINK-40037:
---------------------------------------

Thanks [~gliter] for the write-up, the repro matches what I see in the code.

[~Zakelly] [~Yanfei Lei] could one of you assign this to me? I'd like to take 
it.

{{AsyncKeyedStateBackendAdaptor.dispose()}} and {{close()}} are empty
({{{}AsyncKeyedStateBackendAdaptor.java:133,143{}}}). When 
{{StreamTaskStateInitializerImpl}} wraps a v1
{{RocksDBKeyedStateBackend}} in the adaptor for async-state-v2 operators, the 
wrapped backend's {{dispose()}} is
never called, so {{cleanInstanceBasePath()}} doesn't run and the local dir 
leaks on every redeploy.

Delegating both methods to the wrapped backend fixes it. Will open a PR.

> Local RocksDB working directory leaked (with all file handles held open) when 
> an async-state-v2 task is redeployed onto the same TaskManager after a job 
> restart
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40037
>                 URL: https://issues.apache.org/jira/browse/FLINK-40037
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 2.2.1
>            Reporter: Grzegorz Liter
>            Priority: Major
>         Attachments: image-2026-07-01-11-04-38-426.png
>
>
> When using the State V2 / async state API (AsyncKeyedProcessOperator, 
> StateFuture) on top of the classic embedded RocksDB state backend 
> (state.backend: rocksdb, not ForSt), a full job restart that redeploys a 
> subtask onto a TaskManager it was already running on causes the previous 
> attempt's local RocksDB working directory to leak permanently — both on disk 
> and as open file descriptors — instead of being cleaned up.
> Each task attempt creates a local working directory of the form:
> <taskmanager-tmp-dir>/tmp/job_<jobId>_op_<operatorId>__<subtaskIndex>_<parallelism>__uuid_<uuid>/db
> When the job restarts and the subtask is rescheduled onto the same 
> TaskManager, a new UUID directory is created for the new attempt, but the old 
> directory is never removed, and — more importantly — the old 
> AsyncKeyedStateBackendAdaptor/RocksDBKeyedStateBackend instance backing it is 
> apparently never disposed: every SST/data file in the old directory remains 
> open by the TaskManager JVM indefinitely (confirmed via /proc/<pid>/fd), with 
> no decrease over time. Because state.backend.rocksdb.files.open defaults to 
> -1 (RocksDB keeps every table file open for the life of the DB object, no LRU 
> eviction), none of these leaked handles are ever released.
> This has two compounding effects:
> 1. Disk usage roughly doubles (or worse, with repeated restarts) on every 
> TaskManager pod that survives a job restart but gets a subtask redeployed 
> onto it, since the old directory's disk blocks can't be reclaimed even by 
> manually deleting the files (see reproduction step 6) — Linux won't free 
> blocks for a file while any process still holds it open.
> 2. Because disk fills up faster on surviving TMs, this creates a cascading 
> failure: one TM running low on disk → job-wide restart → every surviving TM 
> leaks another copy of its local state → disk fills faster next time → more 
> restarts.
>  
> !image-2026-07-01-11-04-46-408.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to