Li Guo created FLINK-39874:
------------------------------

             Summary: NativeS3RecoverableFsDataOutputStream leaks a temp file 
when the commit-time part upload fails, and uses non-idempotent deletes
                 Key: FLINK-39874
                 URL: https://issues.apache.org/jira/browse/FLINK-39874
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 2.3.0
            Reporter: Li Guo


h3. Summary

In the native S3 filesystem, {{NativeS3RecoverableFsDataOutputStream}} can 
permanently leak a local temp file when the part upload at commit time fails. 
{{closeForCommit()}} sets {{closed = true}} before it calls 
{{uploadCurrentPart()}}, so if {{uploadPart()}} throws there, the temp file is 
never deleted and the later {{close()}} no-ops on its {{if (!closed)}} guard. 
The file is left behind in the shared {{io.tmp.dirs}} until the TaskManager is 
recycled.

There are two related, smaller issues in the same class: the per-part cleanup 
is also missing on the {{write()}} flush path (a robustness gap, not a 
permanent leak), and the three {{Files.delete()}} calls are non-idempotent, one 
of which is a real intra-class TOCTOU.

Affects: 2.3.0.

h3. The permanent leak (closeForCommit path)

When a small stream is committed, no part has been flushed yet (a write smaller 
than the minimum part size does not flush during {{write()}}), so the only 
{{uploadPart()}} happens at commit time. {{closeForCommit()}} does this in 
order:

# takes the lock
# sets {{closed = true}}
# if there are pending bytes, calls {{uploadCurrentPart()}}, which calls 
{{uploadPart()}}

In the pristine code, {{uploadCurrentPart()}} deletes the temp file only 
*after* a successful upload. If {{uploadPart()}} throws (for example an S3 5xx 
that survives the SDK's own internal retries), the delete is skipped. Because 
{{closed}} is already {{true}}, a subsequent {{close()}} short-circuits on its 
{{if (!closed)}} guard and never reaches its own delete. The pending part's 
{{s3-part-<uuid>}} file is then orphaned in the shared {{io.tmp.dirs}} (one 
leaked file per affected stream) until the TaskManager is recycled.

Note this is per stream, not per retry. The SDK's retries are internal to a 
single {{uploadPart()}} call and reuse the same temp file, so there is no 
per-retry accumulation.

h3. The write() flush path (robustness gap, not a leak)

When a part is flushed during {{write()}} (the buffered part reaches the 
minimum part size), {{uploadCurrentPart()}} is also the place that would delete 
the temp file, and on failure it does not. But here the stream is not marked 
closed, so a later {{close()}} still reaches its delete and reclaims the file. 
So on this path the missing per-part cleanup is a robustness gap rather than a 
permanent leak. I am fixing it in the same place because it is the same missing 
cleanup.

h3. Non-idempotent deletes

The class has three {{Files.delete()}} call sites (in {{uploadCurrentPart()}}, 
the {{closeForCommit()}} else-branch, and {{close()}}). {{Files.delete()}} 
throws {{NoSuchFileException}} if the file is already gone.

The site in {{close()}} is a real, if minor, intra-class TOCTOU. {{write()}} 
and {{uploadCurrentPart()}} run without the lock and can delete 
{{currentTempFile}}, while {{close()}} can run concurrently during 
cancellation. {{close()}} does a check-then-act ({{currentTempFile.exists()}} 
followed by {{Files.delete()}}), so the file can disappear in that window and 
the delete throws {{NoSuchFileException}}. The other two sites are not 
concurrently reachable in the same way; converting them is accompanying 
hardening so the cleanup is uniformly idempotent.

h3. Fix

* Wrap the upload in {{uploadCurrentPart()}} in a {{try/finally}} and delete 
the temp file in the {{finally}}, so it is removed whether or not 
{{uploadPart()}} succeeds. The delete is itself wrapped so an {{IOException}} 
from the delete is logged at WARN and cannot mask or replace the original 
upload {{IOException}}.
* Replace all three {{Files.delete()}} calls with {{Files.deleteIfExists()}} 
(and drop the now-redundant {{exists()}} guard in {{close()}}), making the 
cleanup idempotent and closing the TOCTOU.

h3. Tests

Four JUnit tests, verified red/green (4/4 pass with the fix; without the fix, 
exactly 3 fail and the happy-path control still passes):

* {{closeForCommitUploadFailureDeletesTempFile}} - commit-time upload failure. 
Writes a sub-part-size buffer (so the only upload is at commit), forces 
{{uploadPart()}} to throw, and asserts no {{s3-part-*}} file remains. Fails 
without the fix with the leftover file present. (The test asserts the visible 
symptom; the {{close()}} no-op that makes the leak permanent is established by 
reading the code, not by the assertion.)
* {{uploadPartFailureFromWriteDeletesTempFile}} - the {{write()}} flush path. 
Writes a full part to force a flush during {{write()}}, forces the upload to 
throw, and asserts cleanup. Fails without the fix.
* {{closeForCommitIsIdempotentWhenTempFileMissing}} - the non-idempotent 
delete. Drives {{closeForCommit()}} down its else-branch with the temp file 
already removed; throws {{NoSuchFileException}} without the fix.
* {{closeForCommitSuccessDeletesTempFile}} - happy-path control. Commits 
successfully and asserts no temp file remains; passes with and without the fix 
(regression guard).

h3. Scope / relation to other issues

This is the local temp-file leak in the native S3 recoverable output stream. It 
is distinct from FLINK-39786 (the remote orphaned multipart upload), which is a 
separate, internally-claimed bug, and from FLINK-39778 (resume tail loss). I 
did not find an existing JIRA or PR covering this local-leak issue.

h3. Next step

The fix and the four regression tests are ready against {{master}}. I am a 
first-time contributor and cannot self-assign; could a committer please assign 
this to me so I can open the PR? GitHub handle: NestDream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to