[
https://issues.apache.org/jira/browse/FLINK-40037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093245#comment-18093245
]
Kartikey Pant commented on FLINK-40037:
---------------------------------------
Thanks [~gliter] for the write-up, the repro matches what I see in the code.
[~Zakelly] [~Yanfei Lei] could one of you assign this to me? I'd like to take
it.
{{AsyncKeyedStateBackendAdaptor.dispose()}} and {{close()}} are empty
({{{}AsyncKeyedStateBackendAdaptor.java:133,143{}}}). When
{{StreamTaskStateInitializerImpl}} wraps a v1
{{RocksDBKeyedStateBackend}} in the adaptor for async-state-v2 operators, the
wrapped backend's {{dispose()}} is
never called, so {{cleanInstanceBasePath()}} doesn't run and the local dir
leaks on every redeploy.
Delegating both methods to the wrapped backend fixes it. Will open a PR.
> Local RocksDB working directory leaked (with all file handles held open) when
> an async-state-v2 task is redeployed onto the same TaskManager after a job
> restart
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-40037
> URL: https://issues.apache.org/jira/browse/FLINK-40037
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 2.2.1
> Reporter: Grzegorz Liter
> Priority: Major
> Attachments: image-2026-07-01-11-04-38-426.png
>
>
> When using the State V2 / async state API (AsyncKeyedProcessOperator,
> StateFuture) on top of the classic embedded RocksDB state backend
> (state.backend: rocksdb, not ForSt), a full job restart that redeploys a
> subtask onto a TaskManager it was already running on causes the previous
> attempt's local RocksDB working directory to leak permanently — both on disk
> and as open file descriptors — instead of being cleaned up.
> Each task attempt creates a local working directory of the form:
> <taskmanager-tmp-dir>/tmp/job_<jobId>_op_<operatorId>__<subtaskIndex>_<parallelism>__uuid_<uuid>/db
> When the job restarts and the subtask is rescheduled onto the same
> TaskManager, a new UUID directory is created for the new attempt, but the old
> directory is never removed, and — more importantly — the old
> AsyncKeyedStateBackendAdaptor/RocksDBKeyedStateBackend instance backing it is
> apparently never disposed: every SST/data file in the old directory remains
> open by the TaskManager JVM indefinitely (confirmed via /proc/<pid>/fd), with
> no decrease over time. Because state.backend.rocksdb.files.open defaults to
> -1 (RocksDB keeps every table file open for the life of the DB object, no LRU
> eviction), none of these leaked handles are ever released.
> This has two compounding effects:
> 1. Disk usage roughly doubles (or worse, with repeated restarts) on every
> TaskManager pod that survives a job restart but gets a subtask redeployed
> onto it, since the old directory's disk blocks can't be reclaimed even by
> manually deleting the files (see reproduction step 6) — Linux won't free
> blocks for a file while any process still holds it open.
> 2. Because disk fills up faster on surviving TMs, this creates a cascading
> failure: one TM running low on disk → job-wide restart → every surviving TM
> leaks another copy of its local state → disk fills faster next time → more
> restarts.
>
> !image-2026-07-01-11-04-46-408.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)