[ 
https://issues.apache.org/jira/browse/FLINK-39874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39874:
-----------------------------------
    Labels: pull-request-available  (was: )

> NativeS3RecoverableFsDataOutputStream leaks a temp file when the commit-time 
> part upload fails, and uses non-idempotent deletes
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39874
>                 URL: https://issues.apache.org/jira/browse/FLINK-39874
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 2.3.0
>            Reporter: Li Guo
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> In the native S3 filesystem, {{NativeS3RecoverableFsDataOutputStream}} can 
> permanently leak a local temp file when the part upload at commit time fails. 
> {{closeForCommit()}} sets {{closed = true}} before it calls 
> {{uploadCurrentPart()}}, so if {{uploadPart()}} throws there, the temp file 
> is never deleted and the later {{close()}} no-ops on its {{if (!closed)}} 
> guard. The file is left behind in the shared {{io.tmp.dirs}} until the 
> TaskManager is recycled.
> There are two related, smaller issues in the same class: the per-part cleanup 
> is also missing on the {{write()}} flush path (a robustness gap, not a 
> permanent leak), and the three {{Files.delete()}} calls are non-idempotent, 
> one of which is a real intra-class TOCTOU.
> Affects: 2.3.0.
> h3. The permanent leak (closeForCommit path)
> When a small stream is committed, no part has been flushed yet (a write 
> smaller than the minimum part size does not flush during {{write()}}), so the 
> only {{uploadPart()}} happens at commit time. {{closeForCommit()}} does this 
> in order:
> # takes the lock
> # sets {{closed = true}}
> # if there are pending bytes, calls {{uploadCurrentPart()}}, which calls 
> {{uploadPart()}}
> In the pristine code, {{uploadCurrentPart()}} deletes the temp file only 
> *after* a successful upload. If {{uploadPart()}} throws (for example an S3 
> 5xx that survives the SDK's own internal retries), the delete is skipped. 
> Because {{closed}} is already {{true}}, a subsequent {{close()}} 
> short-circuits on its {{if (!closed)}} guard and never reaches its own 
> delete. The pending part's {{s3-part-<uuid>}} file is then orphaned in the 
> shared {{io.tmp.dirs}} (one leaked file per affected stream) until the 
> TaskManager is recycled.
> Note this is per stream, not per retry. The SDK's retries are internal to a 
> single {{uploadPart()}} call and reuse the same temp file, so there is no 
> per-retry accumulation.
> h3. The write() flush path (robustness gap, not a leak)
> When a part is flushed during {{write()}} (the buffered part reaches the 
> minimum part size), {{uploadCurrentPart()}} is also the place that would 
> delete the temp file, and on failure it does not. But here the stream is not 
> marked closed, so a later {{close()}} still reaches its delete and reclaims 
> the file. So on this path the missing per-part cleanup is a robustness gap 
> rather than a permanent leak. I am fixing it in the same place because it is 
> the same missing cleanup.
> h3. Non-idempotent deletes
> The class has three {{Files.delete()}} call sites (in 
> {{uploadCurrentPart()}}, the {{closeForCommit()}} else-branch, and 
> {{close()}}). {{Files.delete()}} throws {{NoSuchFileException}} if the file 
> is already gone.
> The site in {{close()}} is a real, if minor, intra-class TOCTOU. {{write()}} 
> and {{uploadCurrentPart()}} run without the lock and can delete 
> {{currentTempFile}}, while {{close()}} can run concurrently during 
> cancellation. {{close()}} does a check-then-act ({{currentTempFile.exists()}} 
> followed by {{Files.delete()}}), so the file can disappear in that window and 
> the delete throws {{NoSuchFileException}}. The other two sites are not 
> concurrently reachable in the same way; converting them is accompanying 
> hardening so the cleanup is uniformly idempotent.
> h3. Fix
> * Wrap the upload in {{uploadCurrentPart()}} in a {{try/finally}} and delete 
> the temp file in the {{finally}}, so it is removed whether or not 
> {{uploadPart()}} succeeds. The delete is itself wrapped so an {{IOException}} 
> from the delete is logged at WARN and cannot mask or replace the original 
> upload {{IOException}}.
> * Replace all three {{Files.delete()}} calls with {{Files.deleteIfExists()}} 
> (and drop the now-redundant {{exists()}} guard in {{close()}}), making the 
> cleanup idempotent and closing the TOCTOU.
> h3. Tests
> Four JUnit tests, verified red/green (4/4 pass with the fix; without the fix, 
> exactly 3 fail and the happy-path control still passes):
> * {{closeForCommitUploadFailureDeletesTempFile}} - commit-time upload 
> failure. Writes a sub-part-size buffer (so the only upload is at commit), 
> forces {{uploadPart()}} to throw, and asserts no {{s3-part-*}} file remains. 
> Fails without the fix with the leftover file present. (The test asserts the 
> visible symptom; the {{close()}} no-op that makes the leak permanent is 
> established by reading the code, not by the assertion.)
> * {{uploadPartFailureFromWriteDeletesTempFile}} - the {{write()}} flush path. 
> Writes a full part to force a flush during {{write()}}, forces the upload to 
> throw, and asserts cleanup. Fails without the fix.
> * {{closeForCommitIsIdempotentWhenTempFileMissing}} - the non-idempotent 
> delete. Drives {{closeForCommit()}} down its else-branch with the temp file 
> already removed; throws {{NoSuchFileException}} without the fix.
> * {{closeForCommitSuccessDeletesTempFile}} - happy-path control. Commits 
> successfully and asserts no temp file remains; passes with and without the 
> fix (regression guard).
> h3. Scope / relation to other issues
> This is the local temp-file leak in the native S3 recoverable output stream. 
> It is distinct from FLINK-39786 (the remote orphaned multipart upload), which 
> is a separate, internally-claimed bug, and from FLINK-39778 (resume tail 
> loss). I did not find an existing JIRA or PR covering this local-leak issue.
> h3. Next step
> The fix and the four regression tests are ready against {{master}}. I am a 
> first-time contributor and cannot self-assign; could a committer please 
> assign this to me so I can open the PR? GitHub handle: NestDream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to