[
https://issues.apache.org/jira/browse/FLINK-39874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39874:
-----------------------------------
Labels: pull-request-available (was: )
> NativeS3RecoverableFsDataOutputStream leaks a temp file when the commit-time
> part upload fails, and uses non-idempotent deletes
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39874
> URL: https://issues.apache.org/jira/browse/FLINK-39874
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 2.3.0
> Reporter: Li Guo
> Priority: Major
> Labels: pull-request-available
>
> h3. Summary
> In the native S3 filesystem, {{NativeS3RecoverableFsDataOutputStream}} can
> permanently leak a local temp file when the part upload at commit time fails.
> {{closeForCommit()}} sets {{closed = true}} before it calls
> {{uploadCurrentPart()}}, so if {{uploadPart()}} throws there, the temp file
> is never deleted and the later {{close()}} no-ops on its {{if (!closed)}}
> guard. The file is left behind in the shared {{io.tmp.dirs}} until the
> TaskManager is recycled.
> There are two related, smaller issues in the same class: the per-part cleanup
> is also missing on the {{write()}} flush path (a robustness gap, not a
> permanent leak), and the three {{Files.delete()}} calls are non-idempotent,
> one of which is a real intra-class TOCTOU.
> Affects: 2.3.0.
> h3. The permanent leak (closeForCommit path)
> When a small stream is committed, no part has been flushed yet (a write
> smaller than the minimum part size does not flush during {{write()}}), so the
> only {{uploadPart()}} happens at commit time. {{closeForCommit()}} does this
> in order:
> # takes the lock
> # sets {{closed = true}}
> # if there are pending bytes, calls {{uploadCurrentPart()}}, which calls
> {{uploadPart()}}
> In the pristine code, {{uploadCurrentPart()}} deletes the temp file only
> *after* a successful upload. If {{uploadPart()}} throws (for example an S3
> 5xx that survives the SDK's own internal retries), the delete is skipped.
> Because {{closed}} is already {{true}}, a subsequent {{close()}}
> short-circuits on its {{if (!closed)}} guard and never reaches its own
> delete. The pending part's {{s3-part-<uuid>}} file is then orphaned in the
> shared {{io.tmp.dirs}} (one leaked file per affected stream) until the
> TaskManager is recycled.
> Note this is per stream, not per retry. The SDK's retries are internal to a
> single {{uploadPart()}} call and reuse the same temp file, so there is no
> per-retry accumulation.
> h3. The write() flush path (robustness gap, not a leak)
> When a part is flushed during {{write()}} (the buffered part reaches the
> minimum part size), {{uploadCurrentPart()}} is also the place that would
> delete the temp file, and on failure it does not. But here the stream is not
> marked closed, so a later {{close()}} still reaches its delete and reclaims
> the file. So on this path the missing per-part cleanup is a robustness gap
> rather than a permanent leak. I am fixing it in the same place because it is
> the same missing cleanup.
> h3. Non-idempotent deletes
> The class has three {{Files.delete()}} call sites (in
> {{uploadCurrentPart()}}, the {{closeForCommit()}} else-branch, and
> {{close()}}). {{Files.delete()}} throws {{NoSuchFileException}} if the file
> is already gone.
> The site in {{close()}} is a real, if minor, intra-class TOCTOU. {{write()}}
> and {{uploadCurrentPart()}} run without the lock and can delete
> {{currentTempFile}}, while {{close()}} can run concurrently during
> cancellation. {{close()}} does a check-then-act ({{currentTempFile.exists()}}
> followed by {{Files.delete()}}), so the file can disappear in that window and
> the delete throws {{NoSuchFileException}}. The other two sites are not
> concurrently reachable in the same way; converting them is accompanying
> hardening so the cleanup is uniformly idempotent.
> h3. Fix
> * Wrap the upload in {{uploadCurrentPart()}} in a {{try/finally}} and delete
> the temp file in the {{finally}}, so it is removed whether or not
> {{uploadPart()}} succeeds. The delete is itself wrapped so an {{IOException}}
> from the delete is logged at WARN and cannot mask or replace the original
> upload {{IOException}}.
> * Replace all three {{Files.delete()}} calls with {{Files.deleteIfExists()}}
> (and drop the now-redundant {{exists()}} guard in {{close()}}), making the
> cleanup idempotent and closing the TOCTOU.
> h3. Tests
> Four JUnit tests, verified red/green (4/4 pass with the fix; without the fix,
> exactly 3 fail and the happy-path control still passes):
> * {{closeForCommitUploadFailureDeletesTempFile}} - commit-time upload
> failure. Writes a sub-part-size buffer (so the only upload is at commit),
> forces {{uploadPart()}} to throw, and asserts no {{s3-part-*}} file remains.
> Fails without the fix with the leftover file present. (The test asserts the
> visible symptom; the {{close()}} no-op that makes the leak permanent is
> established by reading the code, not by the assertion.)
> * {{uploadPartFailureFromWriteDeletesTempFile}} - the {{write()}} flush path.
> Writes a full part to force a flush during {{write()}}, forces the upload to
> throw, and asserts cleanup. Fails without the fix.
> * {{closeForCommitIsIdempotentWhenTempFileMissing}} - the non-idempotent
> delete. Drives {{closeForCommit()}} down its else-branch with the temp file
> already removed; throws {{NoSuchFileException}} without the fix.
> * {{closeForCommitSuccessDeletesTempFile}} - happy-path control. Commits
> successfully and asserts no temp file remains; passes with and without the
> fix (regression guard).
> h3. Scope / relation to other issues
> This is the local temp-file leak in the native S3 recoverable output stream.
> It is distinct from FLINK-39786 (the remote orphaned multipart upload), which
> is a separate, internally-claimed bug, and from FLINK-39778 (resume tail
> loss). I did not find an existing JIRA or PR covering this local-leak issue.
> h3. Next step
> The fix and the four regression tests are ready against {{master}}. I am a
> first-time contributor and cannot self-assign; could a committer please
> assign this to me so I can open the PR? GitHub handle: NestDream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)