Li Guo created FLINK-39874:
------------------------------
Summary: NativeS3RecoverableFsDataOutputStream leaks a temp file
when the commit-time part upload fails, and uses non-idempotent deletes
Key: FLINK-39874
URL: https://issues.apache.org/jira/browse/FLINK-39874
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 2.3.0
Reporter: Li Guo
h3. Summary
In the native S3 filesystem, {{NativeS3RecoverableFsDataOutputStream}} can
permanently leak a local temp file when the part upload at commit time fails.
{{closeForCommit()}} sets {{closed = true}} before it calls
{{uploadCurrentPart()}}, so if {{uploadPart()}} throws there, the temp file is
never deleted and the later {{close()}} no-ops on its {{if (!closed)}} guard.
The file is left behind in the shared {{io.tmp.dirs}} until the TaskManager is
recycled.
There are two related, smaller issues in the same class: the per-part cleanup
is also missing on the {{write()}} flush path (a robustness gap, not a
permanent leak), and the three {{Files.delete()}} calls are non-idempotent, one
of which is a real intra-class TOCTOU.
Affects: 2.3.0.
h3. The permanent leak (closeForCommit path)
When a small stream is committed, no part has been flushed yet (a write smaller
than the minimum part size does not flush during {{write()}}), so the only
{{uploadPart()}} happens at commit time. {{closeForCommit()}} does this in
order:
# takes the lock
# sets {{closed = true}}
# if there are pending bytes, calls {{uploadCurrentPart()}}, which calls
{{uploadPart()}}
In the pristine code, {{uploadCurrentPart()}} deletes the temp file only
*after* a successful upload. If {{uploadPart()}} throws (for example an S3 5xx
that survives the SDK's own internal retries), the delete is skipped. Because
{{closed}} is already {{true}}, a subsequent {{close()}} short-circuits on its
{{if (!closed)}} guard and never reaches its own delete. The pending part's
{{s3-part-<uuid>}} file is then orphaned in the shared {{io.tmp.dirs}} (one
leaked file per affected stream) until the TaskManager is recycled.
Note this is per stream, not per retry. The SDK's retries are internal to a
single {{uploadPart()}} call and reuse the same temp file, so there is no
per-retry accumulation.
h3. The write() flush path (robustness gap, not a leak)
When a part is flushed during {{write()}} (the buffered part reaches the
minimum part size), {{uploadCurrentPart()}} is also the place that would delete
the temp file, and on failure it does not. But here the stream is not marked
closed, so a later {{close()}} still reaches its delete and reclaims the file.
So on this path the missing per-part cleanup is a robustness gap rather than a
permanent leak. I am fixing it in the same place because it is the same missing
cleanup.
h3. Non-idempotent deletes
The class has three {{Files.delete()}} call sites (in {{uploadCurrentPart()}},
the {{closeForCommit()}} else-branch, and {{close()}}). {{Files.delete()}}
throws {{NoSuchFileException}} if the file is already gone.
The site in {{close()}} is a real, if minor, intra-class TOCTOU. {{write()}}
and {{uploadCurrentPart()}} run without the lock and can delete
{{currentTempFile}}, while {{close()}} can run concurrently during
cancellation. {{close()}} does a check-then-act ({{currentTempFile.exists()}}
followed by {{Files.delete()}}), so the file can disappear in that window and
the delete throws {{NoSuchFileException}}. The other two sites are not
concurrently reachable in the same way; converting them is accompanying
hardening so the cleanup is uniformly idempotent.
h3. Fix
* Wrap the upload in {{uploadCurrentPart()}} in a {{try/finally}} and delete
the temp file in the {{finally}}, so it is removed whether or not
{{uploadPart()}} succeeds. The delete is itself wrapped so an {{IOException}}
from the delete is logged at WARN and cannot mask or replace the original
upload {{IOException}}.
* Replace all three {{Files.delete()}} calls with {{Files.deleteIfExists()}}
(and drop the now-redundant {{exists()}} guard in {{close()}}), making the
cleanup idempotent and closing the TOCTOU.
h3. Tests
Four JUnit tests, verified red/green (4/4 pass with the fix; without the fix,
exactly 3 fail and the happy-path control still passes):
* {{closeForCommitUploadFailureDeletesTempFile}} - commit-time upload failure.
Writes a sub-part-size buffer (so the only upload is at commit), forces
{{uploadPart()}} to throw, and asserts no {{s3-part-*}} file remains. Fails
without the fix with the leftover file present. (The test asserts the visible
symptom; the {{close()}} no-op that makes the leak permanent is established by
reading the code, not by the assertion.)
* {{uploadPartFailureFromWriteDeletesTempFile}} - the {{write()}} flush path.
Writes a full part to force a flush during {{write()}}, forces the upload to
throw, and asserts cleanup. Fails without the fix.
* {{closeForCommitIsIdempotentWhenTempFileMissing}} - the non-idempotent
delete. Drives {{closeForCommit()}} down its else-branch with the temp file
already removed; throws {{NoSuchFileException}} without the fix.
* {{closeForCommitSuccessDeletesTempFile}} - happy-path control. Commits
successfully and asserts no temp file remains; passes with and without the fix
(regression guard).
h3. Scope / relation to other issues
This is the local temp-file leak in the native S3 recoverable output stream. It
is distinct from FLINK-39786 (the remote orphaned multipart upload), which is a
separate, internally-claimed bug, and from FLINK-39778 (resume tail loss). I
did not find an existing JIRA or PR covering this local-leak issue.
h3. Next step
The fix and the four regression tests are ready against {{master}}. I am a
first-time contributor and cannot self-assign; could a committer please assign
this to me so I can open the PR? GitHub handle: NestDream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)