phet commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1746290031
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java:
##########
@@ -34,6 +34,6 @@ public interface DeleteWorkDirsActivity {
* TODO: Generalize the input to support multiple platforms outside of just
HDFS
*/
@ActivityMethod
- CleanupResult cleanup(WUProcessingSpec workSpec, EventSubmitterContext
eventSubmitterContext, Set<String> resourcesToClean);
+ CleanupResult delete(WUProcessingSpec workSpec, EventSubmitterContext
eventSubmitterContext, Set<String> resourcesToClean);
Review Comment:
the impl names the `Set<String> workDirPaths`. suggest to do that here too
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -77,24 +77,25 @@ public GenerateWorkUnitResult generateWorkUnits(Properties
jobProps, EventSubmit
// before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);
- boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher`
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for
cleanup
- DestinationDatasetHandlerService datasetHandlerService = closer.register(
- new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
-
- List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState,
datasetHandlerService, closer);
+ List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState,
eventSubmitterContext, closer);
Set<String> resourcesToCleanUp = new HashSet<>();
+ // Validate every workunit if they have the temp dir props since some
workunits may be commit steps
for (WorkUnit workUnit : workUnits) {
-
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
-
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ if
(workUnit.contains(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR))) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+ }
+ if
(workUnit.contains(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR))) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ }
if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR,
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)) {
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
}
Review Comment:
please put into a method:
```
Set<String> calcWorkDirPathsToDelete(List<WorkUnit> wus)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec,
EventSubmitterContext ev
}
}
- private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
- throws IOException {
- return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE ||
!jobContext.getCommitSequenceStore()
- .get().exists(jobState.getJobName());
+ private static Map<String, Long> cleanupStagingDataPerTask(JobState
jobState, Set<String> resourcesToClean) throws IOException {
+ log.warn("Clean up staging data by task is not supported, will clean up
job level data instead");
+ return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
}
- // Goes through all the task states and cleans them up individually, there
is some overlap with the job level cleanup as the job level cleanup will
- // also clean up the task level staging data
- private static Map<String, Long> cleanupStagingDataPerTask(JobState
jobState, WUProcessingSpec workSpec) throws IOException {
- Closer closer = Closer.create();
- Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
- FileSystem fs = Help.loadFileSystem(workSpec);
- int numThreads =
jobState.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
- List<TaskState> taskStateList = Help.loadTaskStates(workSpec, fs,
jobState, numThreads);
- Map<String, Long> cleanupMetrics = Maps.newHashMap();
- try {
- for (TaskState taskState : taskStateList) {
- try {
- cleanupMetrics.putAll(cleanTaskStagingData(taskState, log, closer,
parallelRunners));
- } catch (IOException e) {
- log.error(String.format("Failed to clean staging data for task %s:
%s", taskState.getTaskId(), e), e);
- }
- }
- } finally {
- try {
- closer.close();
- } catch (IOException e) {
- log.error("Failed to clean staging data", e);
- }
- }
- return cleanupMetrics;
- }
-
- private static Map<String, Long> cleanupStagingDataForEntireJob(JobState
state, Set<String> resourcesToClean, WUProcessingSpec workSpec) throws
IOException {
+ private static Map<String, Long> cleanupStagingDataForEntireJob(JobState
state, Set<String> resourcesToClean) throws IOException {
if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) ||
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
return Maps.newHashMap();
}
Review Comment:
leave it to the caller to handle such "skipping"
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CleanupResult.java:
##########
Review Comment:
it's looking like we may not need this. if so, perhaps just a count of how
many "top-level" dirs were deleted successfully? the caller could always
compare it to the `Set::size`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java:
##########
@@ -27,19 +26,14 @@
/**
- * Data structure representing the stats for a committed dataset, and the
total number of committed workunits in the Gobblin Temporal job
- * Return type of {@link
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
- * and {@link
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ * Data structure representing the result of generating work units, where it
returns the number of generated work units and
+ * the folders that the parent workflow should clean up as a side effect of
generating WUs
Review Comment:
nit: "as a side effect of" seems less accurate than "as a follow up to"
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closer;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CleanupResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+ static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+ @Override
+ public CleanupResult cleanup(WUProcessingSpec workSpec,
EventSubmitterContext eventSubmitterContext, Set<String> resourcesToClean) {
+ //TODO: Emit timers to measure length of cleanup step
+ Optional<String> optJobName = Optional.empty();
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ optJobName = Optional.ofNullable(jobState.getJobName());
+
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, null);
+ if (PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.CLEANUP_STAGING_DATA_BY_INITIALIZER, "false")) {
+ //Clean up will be done by initializer.
+ return CleanupResult.createEmpty();
+ }
+ try {
+ if (!canCleanStagingData(jobContext, jobState)) {
+ log.error("Job " + jobState.getJobName() + " has unfinished commit
sequences. Will not clean up staging data.");
+ return CleanupResult.createEmpty();
+ }
+ } catch (IOException e) {
+ throw new JobException("Failed to check unfinished commit sequences",
e);
+ }
+ Map<String, Long> cleanupMetrics =
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK,
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+ cleanupStagingDataPerTask(jobState, workSpec) :
cleanupStagingDataForEntireJob(jobState, resourcesToClean, workSpec);
+
+ return new CleanupResult(cleanupMetrics);
+ } catch (Exception e) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to cleanup temporary folders for job %s",
optJobName.orElse(UNDEFINED_JOB_NAME)),
+ IOException.class.toString(),
+ new IOException(e)
+ );
+ }
+ }
+
+ private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
+ throws IOException {
+ return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE ||
!jobContext.getCommitSequenceStore()
+ .get().exists(jobState.getJobName());
+ }
+
+ // Goes through all the task states and cleans them up individually, there
is some overlap with the job level cleanup as the job level cleanup will
+ // also clean up the task level staging data
+ private static Map<String, Long> cleanupStagingDataPerTask(JobState
jobState, WUProcessingSpec workSpec) throws IOException {
+ Closer closer = Closer.create();
+ Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ int numThreads =
jobState.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
+ List<TaskState> taskStateList = Help.loadTaskStates(workSpec, fs,
jobState, numThreads);
+ Map<String, Long> cleanupMetrics = Maps.newHashMap();
+ try {
+ for (TaskState taskState : taskStateList) {
+ try {
+ cleanupMetrics.putAll(cleanTaskStagingData(taskState, log, closer,
parallelRunners));
+ } catch (IOException e) {
+ log.error(String.format("Failed to clean staging data for task %s:
%s", taskState.getTaskId(), e), e);
+ }
+ }
+ } finally {
+ try {
+ closer.close();
+ } catch (IOException e) {
+ log.error("Failed to clean staging data", e);
+ }
+ }
+ return cleanupMetrics;
+ }
+
+ private static Map<String, Long> cleanupStagingDataForEntireJob(JobState
state, Set<String> resourcesToClean, WUProcessingSpec workSpec) throws
IOException {
+ if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) ||
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+ return Maps.newHashMap();
+ }
+ String writerFsUri =
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri,
WriterUtils.getFsConfiguration(state));
+ Map<String, Long> cleanupMetrics = Maps.newHashMap();
+ String jobId = state.getJobId();
+
+ for (String resource : resourcesToClean) {
+ Path pathToClean = new Path(resource);
+ Long filesCleaned = fs.getContentSummary(pathToClean).getLength();
Review Comment:
I feel it might be overkill. as a start, you could always log this value,
but I'm not expecting you to always do that.
(anyway, `.getLength()` should be the size in bytes, NOT `filesCleaned`.)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec,
EventSubmitterContext ev
}
}
- private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
- throws IOException {
- return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE ||
!jobContext.getCommitSequenceStore()
- .get().exists(jobState.getJobName());
+ private static Map<String, Long> cleanupStagingDataPerTask(JobState
jobState, Set<String> resourcesToClean) throws IOException {
+ log.warn("Clean up staging data by task is not supported, will clean up
job level data instead");
+ return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
Review Comment:
I like this sentiment... just want to think about whether it should be
stronger... such as throwing an exception rather than silently doing something
different
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]