phet commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1746290031


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java:
##########
@@ -34,6 +34,6 @@ public interface DeleteWorkDirsActivity {
    * TODO: Generalize the input to support multiple platforms outside of just 
HDFS
    */
   @ActivityMethod
-  CleanupResult cleanup(WUProcessingSpec workSpec, EventSubmitterContext 
eventSubmitterContext, Set<String> resourcesToClean);
+  CleanupResult delete(WUProcessingSpec workSpec, EventSubmitterContext 
eventSubmitterContext, Set<String> resourcesToClean);

Review Comment:
   the impl names the `Set<String> workDirPaths`.  suggest to do that here too



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -77,24 +77,25 @@ public GenerateWorkUnitResult generateWorkUnits(Properties 
jobProps, EventSubmit
       // before embarking on (potentially expensive) WU creation, first 
pre-check that the FS is available
       FileSystem fs = JobStateUtils.openFileSystem(jobState);
       fs.mkdirs(workDirRoot);
-      boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` 
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for 
cleanup
-      DestinationDatasetHandlerService datasetHandlerService = closer.register(
-          new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
-
-      List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, 
datasetHandlerService, closer);
 
+      List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, 
eventSubmitterContext, closer);
       Set<String> resourcesToCleanUp = new HashSet<>();
+      // Validate every workunit if they have the temp dir props since some 
workunits may be commit steps
       for (WorkUnit workUnit : workUnits) {
-        
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
-        
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+        if 
(workUnit.contains(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR))) {
+          
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+        }
+        if 
(workUnit.contains(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR))) {
+          
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+        }
         if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR, 
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)) {
           
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
         }

Review Comment:
   please put into a method:
   ```
   Set<String> calcWorkDirPathsToDelete(List<WorkUnit> wus)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     }
   }
 
-  private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
-      throws IOException {
-    return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || 
!jobContext.getCommitSequenceStore()
-        .get().exists(jobState.getJobName());
+  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, Set<String> resourcesToClean) throws IOException {
+    log.warn("Clean up staging data by task is not supported, will clean up 
job level data instead");
+    return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
   }
 
-  // Goes through all the task states and cleans them up individually, there 
is some overlap with the job level cleanup as the job level cleanup will
-  // also clean up the task level staging data
-  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, WUProcessingSpec workSpec) throws IOException {
-    Closer closer = Closer.create();
-    Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
-    FileSystem fs = Help.loadFileSystem(workSpec);
-    int numThreads = 
jobState.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, 
ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
-    List<TaskState> taskStateList = Help.loadTaskStates(workSpec, fs, 
jobState, numThreads);
-    Map<String, Long> cleanupMetrics = Maps.newHashMap();
-    try {
-      for (TaskState taskState : taskStateList) {
-        try {
-          cleanupMetrics.putAll(cleanTaskStagingData(taskState, log, closer, 
parallelRunners));
-        } catch (IOException e) {
-          log.error(String.format("Failed to clean staging data for task %s: 
%s", taskState.getTaskId(), e), e);
-        }
-      }
-    } finally {
-      try {
-        closer.close();
-      } catch (IOException e) {
-        log.error("Failed to clean staging data", e);
-      }
-    }
-    return cleanupMetrics;
-  }
-
-  private static Map<String, Long> cleanupStagingDataForEntireJob(JobState 
state, Set<String> resourcesToClean, WUProcessingSpec workSpec) throws 
IOException {
+  private static Map<String, Long> cleanupStagingDataForEntireJob(JobState 
state, Set<String> resourcesToClean) throws IOException {
     if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) || 
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
       return Maps.newHashMap();
     }

Review Comment:
   leave it to the caller to handle such "skipping"



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CleanupResult.java:
##########


Review Comment:
   it's looking like we may not need this.  if so, perhaps just a count of how 
many "top-level" dirs were deleted successfully?  the caller could always 
compare it to the `Set::size`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java:
##########
@@ -27,19 +26,14 @@
 
 
 /**
- * Data structure representing the stats for a committed dataset, and the 
total number of committed workunits in the Gobblin Temporal job
- * Return type of {@link 
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
- * and {@link 
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ * Data structure representing the result of generating work units, where it 
returns the number of generated work units and
+ * the folders that the parent workflow should clean up as a side effect of 
generating WUs

Review Comment:
   nit: "as a side effect of" seems less accurate than "as a follow up to"



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closer;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CleanupResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+  @Override
+  public CleanupResult cleanup(WUProcessingSpec workSpec, 
EventSubmitterContext eventSubmitterContext, Set<String> resourcesToClean) {
+    //TODO: Emit timers to measure length of cleanup step
+    Optional<String> optJobName = Optional.empty();
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      optJobName = Optional.ofNullable(jobState.getJobName());
+
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext jobContext = new JobContext(jobState.getProperties(), log, 
instanceBroker, null);
+      if (PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.CLEANUP_STAGING_DATA_BY_INITIALIZER, "false")) {
+        //Clean up will be done by initializer.
+        return CleanupResult.createEmpty();
+      }
+      try {
+        if (!canCleanStagingData(jobContext, jobState)) {
+          log.error("Job " + jobState.getJobName() + " has unfinished commit 
sequences. Will not clean up staging data.");
+          return CleanupResult.createEmpty();
+        }
+      } catch (IOException e) {
+        throw new JobException("Failed to check unfinished commit sequences", 
e);
+      }
+      Map<String, Long> cleanupMetrics = 
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK, 
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+          cleanupStagingDataPerTask(jobState, workSpec) : 
cleanupStagingDataForEntireJob(jobState, resourcesToClean, workSpec);
+
+      return new CleanupResult(cleanupMetrics);
+    } catch (Exception e) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to cleanup temporary folders for job %s", 
optJobName.orElse(UNDEFINED_JOB_NAME)),
+          IOException.class.toString(),
+          new IOException(e)
+      );
+    }
+  }
+
+  private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
+      throws IOException {
+    return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || 
!jobContext.getCommitSequenceStore()
+        .get().exists(jobState.getJobName());
+  }
+
+  // Goes through all the task states and cleans them up individually, there 
is some overlap with the job level cleanup as the job level cleanup will
+  // also clean up the task level staging data
+  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, WUProcessingSpec workSpec) throws IOException {
+    Closer closer = Closer.create();
+    Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
+    FileSystem fs = Help.loadFileSystem(workSpec);
+    int numThreads = 
jobState.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, 
ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
+    List<TaskState> taskStateList = Help.loadTaskStates(workSpec, fs, 
jobState, numThreads);
+    Map<String, Long> cleanupMetrics = Maps.newHashMap();
+    try {
+      for (TaskState taskState : taskStateList) {
+        try {
+          cleanupMetrics.putAll(cleanTaskStagingData(taskState, log, closer, 
parallelRunners));
+        } catch (IOException e) {
+          log.error(String.format("Failed to clean staging data for task %s: 
%s", taskState.getTaskId(), e), e);
+        }
+      }
+    } finally {
+      try {
+        closer.close();
+      } catch (IOException e) {
+        log.error("Failed to clean staging data", e);
+      }
+    }
+    return cleanupMetrics;
+  }
+
+  private static Map<String, Long> cleanupStagingDataForEntireJob(JobState 
state, Set<String> resourcesToClean, WUProcessingSpec workSpec) throws 
IOException {
+    if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) || 
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+      return Maps.newHashMap();
+    }
+    String writerFsUri = 
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, 
WriterUtils.getFsConfiguration(state));
+    Map<String, Long> cleanupMetrics = Maps.newHashMap();
+    String jobId = state.getJobId();
+
+    for (String resource : resourcesToClean) {
+      Path pathToClean = new Path(resource);
+      Long filesCleaned = fs.getContentSummary(pathToClean).getLength();

Review Comment:
   I feel it might be overkill.  as a start, you could always log this value, 
but I'm not expecting you to always do that. 
   
   (anyway, `.getLength()` should be the size in bytes, NOT `filesCleaned`.)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     }
   }
 
-  private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
-      throws IOException {
-    return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || 
!jobContext.getCommitSequenceStore()
-        .get().exists(jobState.getJobName());
+  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, Set<String> resourcesToClean) throws IOException {
+    log.warn("Clean up staging data by task is not supported, will clean up 
job level data instead");
+    return cleanupStagingDataForEntireJob(jobState, resourcesToClean);

Review Comment:
   I like this sentiment... just want to think about whether it should be 
stronger... such as throwing an exception rather than silently doing something 
different



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to