llllssss94 commented on PR #27423:
URL: https://github.com/apache/flink/pull/27423#issuecomment-4067117141

   Hi, @gaborgsomogyi .
   
   Thanks for the detailed feedback — these are exactly the right questions to 
ask for a change like this.
   
   Let me walk through each concern:
   
   1. "External system created random files is not business justification"
   
   Totally agree. The webhook-created default.txt was the trigger that exposed 
the bug, but the fix itself doesn't rely on that justification. The real issue 
is that Flink's own checkpoint files (state files, metadata, etc.) are nested 
inside chk-{id}/, and recursive=false fails to clean them up on object storage 
like S3/MinIO.
   
   2. "Maybe caller side(s) must be changed" / "safety belt" concern
   
   I traced all callers end-to-end:
   
   disposeStorageLocation() is only called from
   CompletedCheckpointDiscardObject.discard(), which is invoked in
   two cases:
   - CheckpointsCleaner: when CheckpointProperties.discardOnSubsumed() = true
   - StandaloneCompletedCheckpointStore: on job shutdown when
     shouldBeDiscardedOnShutdown(jobStatus) = true
   
   In both cases, the target path is always a Flink-created chk-{id} directory.
   
     In NO_CLAIM mode → forUnclaimedSnapshot() → discardSubsumed=false
     In LEGACY mode → forSavepoint() → discardSubsumed=false
     
     In fact, both forUnclaimedSnapshot() and forSavepoint() set ALL
     discard flags to false (discardSubsumed, discardFinished,
     discardCancelled, discardFailed, discardSuspended), so the original
     snapshot is protected across both CheckpointsCleaner and shutdown paths.
     
     In CLAIM mode → Flink takes ownership, deletion is expected
     So recursive=false is not acting as a safety belt here...
   
   
   3. "Exact proof that only Flink-created paths are deleted"
   
   The exclusiveCheckpointDir passed into disposeStorageLocation() originates 
exclusively from FsCheckpointStorageLocation or 
PersistentMetadataCheckpointStorageLocation constructors — both of which 
receive a chk-{id} directory that Flink itself created via 
createMetadataOutputStream().
   
   There is no code path where an externally-created or user-owned directory 
ends up as the target of disposeStorageLocation().
   
   4. Consistency point
   
   Interestingly, disposeOnFailure() in the same codebase already uses 
recursive=true:
   
   // FsCheckpointStorageLocation.java
   fileSystem.delete(checkpointDirectory, true);  // already recursive!
   So disposeStorageLocation() using recursive=false was always an 
inconsistency.
   
   On test reproducibility:
   
   Flink's LocalFileSystem.delete(path, false) actually throws 
IOException("Directory is not empty") for non-empty directories — same behavior 
as S3AFileSystem. So the unit test FsCompletedCheckpointStorageLocationTest 
directly reproduces the bug without needing a MinIO setup, and fails when 
recursive=false is used.
   
   The IT test covers all 3 RecoveryClaimModes (CLAIM, NO_CLAIM, LEGACY) to 
verify the full checkpoint lifecycle.
   
   Hope this addresses the concerns — happy to dig deeper into any specific 
part!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to