ari created FLINK-38697:
---------------------------
Summary: TaskInformation blobs accumulate without cleanup causing
storage exhaustion
Key: FLINK-38697
URL: https://issues.apache.org/jira/browse/FLINK-38697
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.20.3, 1.19.3, 1.18.1, 1.17.2, 1.16.3, 1.15.4, 1.14.3
Reporter: ari
h1. Summary
When adaptive scheduler is enabled, TaskInformation blob files accumulate on
persistent storage without proper cleanup, eventually leading to storage
exhaustion and job stalling.
h1. Problem Details
On each job deployment and restart (including JM failures),
{{ExecutionJobVertex.getTaskInformationOrBlobKey() }}reconstructs
TaskInformation objects from the JobGraph. These TaskInformation objects can be
extremely large (>200MB) as they can contain serialized UDFs within the task
configuration.
Specifically, the Configuration field in TaskInformation includes a
{{SERIALIZED_UDF}} entry.
When these objects exceed the {{blob.offload-minsize}} configuration, they are
offloaded as permanent blobs to avoid hitting RPC framesize limits. However:
# Blob keys are not reused across failures - each restart creates a new blob
with a different key (same content hash)
# No cleanup mechanism until global termination - Permanent blobs are only
cleaned up when the job reaches a globally terminated state (a state that
doesn’t get reached during internal restarts)
# JobJar blobs ARE reused - In contrast, job JAR blobs stored in the JobGraph
have their keys persisted and are correctly reused
h1. Impact
* Storage Exhaustion in storage directory (specifically a problem for
high-availability storage directory since there could be hard storage limits)
* Job Stalling when storage limit is reached as a restart occurs but because
it cant offload the blob it sends it over RPC causing it to hit the framesize
limit causing checkpoints to never trigger.
* Particularly severe with
* Complex streaming jobs with large/many serialized UDFs in task config
* Frequent TM failures requiring restarts
* High parallelism (each parallelized vertex creates its on TaskInformation
blob)
h1. Reproduction Steps
# Enable adaptive scheduler
# Se {{blob.offload-minsize: 0}} (forces all TaskInformation objects to be
offloaded)
# Run {{kubectl delete pod \{task-manager-pod-name}}} to trigger job restart
# Wait for job to restart and process records
# {{kubectl exec -it \{job-manager-pod-name} -- bash}}
# cd to blobstore directory and run {{ls && stat *}}
# Observe: Every file except the job JAR blob is duplicated after each restart
h1. Expected vs Actual Behavior
Expected: On a restart if content hash is the same, use the previously created
task information object. However, from Flink-7140 it seems that a random key
was introduced to the blob key to prevent hash-collisions. Otherwise, delete
the ones that are no longer needed and then generate the new one.
Actual: New TaskInformation blobs are created on every restart, there is no
cleanup until job reaches globally terminated state, and there is unbounded
blob accumulation over time
--
This message was sent by Atlassian Jira
(v8.20.10#820010)