phet commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1747509115


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -177,9 +177,15 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext ev
 
     try {
       CleanupResult cleanupResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
-          calculateWorkDirsToDelete(jobState.getJobId(), resourcesToCleanUp));
-      for (Map.Entry<String, Long> directoryCleanedResult : 
cleanupResult.getCleanupSummary().entrySet()) {
-        log.info("Cleaned up directory {} with {} bytes deleted", 
directoryCleanedResult.getKey(), directoryCleanedResult.getValue());
+          calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
+      if (directoriesToClean.size() != 
cleanupResult.getAttemptedCleanedDirectories().size()) {
+        log.warn("Expected to clean up {} directories, but only cleaned up 
{}", directoriesToClean.size(),
+            cleanupResult.getAttemptedCleanedDirectories().size());
+        for (String dir : directoriesToClean) {
+          if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {

Review Comment:
   don't you want to negate, so if it's false, then log?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -177,9 +177,15 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext ev
 
     try {
       CleanupResult cleanupResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
-          calculateWorkDirsToDelete(jobState.getJobId(), resourcesToCleanUp));
-      for (Map.Entry<String, Long> directoryCleanedResult : 
cleanupResult.getCleanupSummary().entrySet()) {
-        log.info("Cleaned up directory {} with {} bytes deleted", 
directoryCleanedResult.getKey(), directoryCleanedResult.getValue());
+          calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
+      if (directoriesToClean.size() != 
cleanupResult.getAttemptedCleanedDirectories().size()) {

Review Comment:
   won't the size be the same, just that some would hold the value `false`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CleanupResult.java:
##########
@@ -26,12 +26,12 @@
 
 
 /**
- * Data structure representing the stats for a cleaned up work directory, 
where it returns a list of directories it cleaned up and their size
+ * Data structure representing the stats for a cleaned up work directory, 
where it returns a map of directories the result of their cleanup
  */
 @Data
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class CleanupResult {
 
-  @NonNull private Map<String, Long> cleanupSummary;
+  @NonNull private Map<String, Boolean> attemptedCleanedDirectories;

Review Comment:
   may we just call this deletion.  "cleanup" could mean any number of things 
(e.g. retaining only "useful" files and getting rid of "junk" - for whatever 
definition of useful/junk).  (or perhaps: getting rid of "partial" files and 
keeping only complete ones.)
   
   also, `attemptedCleanedDirectories`, even if named `attemptedDirDeletions`, 
does not capture the semantics of that `Boolean`.  how about 
`deletionSuccessesByDirPath`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     }
   }
 
-  private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
-      throws IOException {
-    return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || 
!jobContext.getCommitSequenceStore()
-        .get().exists(jobState.getJobName());
+  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, Set<String> resourcesToClean) throws IOException {
+    log.warn("Clean up staging data by task is not supported, will clean up 
job level data instead");
+    return cleanupStagingDataForEntireJob(jobState, resourcesToClean);

Review Comment:
   I see (NOW) that is the default... I didn't realize we've been operating w/ 
such widespread override of the default.
   
   I do agree it's risky.  still that logging basically announces the default 
isn't supported and that an alternative approach is to be taken instead.  do 
you believe users would be more surprised by an exception or to discover that 
an unexpected form of cleanup was silently substituted?
   
   I don't see a clear answer, so curious about your opinion.  to me, the 
silent switch-a-roo is more surprising.  my preference would be to fail-fast w/ 
an exception, but to define a property that could be used to override that 
behavior and do job-level deletion like you have here.  the difference is that 
the switch-up becomes opt-in (and explicit), not silently substituted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to