Will-Lo commented on code in PR #3880:
URL: https://github.com/apache/gobblin/pull/3880#discussion_r1501164524
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java:
##########
@@ -71,23 +104,69 @@ public static StateStore<TaskState>
openTaskStateStoreUncached(JobState jobState
return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
}
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return "base" dir root path for work dir (parent of inputs, output task
states, etc.)
+ */
+ public static Path getWorkDirRoot(JobState jobState) {
+ return new Path(
+ new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
jobState.getJobName()),
+ jobState.getJobId());
+ }
+
Review Comment:
Might make sense in the future for either the MR job launcher or a MR Job
Launcher util to have this class since it's deeply tied to the MR implementation
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java:
##########
@@ -71,23 +104,69 @@ public static StateStore<TaskState>
openTaskStateStoreUncached(JobState jobState
return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
}
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return "base" dir root path for work dir (parent of inputs, output task
states, etc.)
+ */
+ public static Path getWorkDirRoot(JobState jobState) {
+ return new Path(
+ new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
jobState.getJobName()),
+ jobState.getJobId());
+ }
+
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return path to {@link FsStateStore<TaskState>} backing dir
*/
public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
- Properties jobProps = jobState.getProperties();
- Path jobOutputPath = new Path(
- new Path(
- new Path(
- jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
- JobState.getJobNameFromProps(jobProps)),
- JobState.getJobIdFromProps(jobProps)),
- OUTPUT_DIR_NAME);
+ Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
return fs.makeQualified(jobOutputPath);
}
+ /** write serialized {@link WorkUnit}s in parallel into files named after
the jobID and task IDs */
+ public static void writeWorkUnits(List<WorkUnit> workUnits, Path
workDirRootPath, JobState jobState, FileSystem fs)
+ throws IOException {
+ String jobId = jobState.getJobId();
+ Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+
+ int numThreads =
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
+ Closer closer = Closer.create(); // (NOTE: try-with-resources syntax
wouldn't allow `catch { closer.rethrow(t) }`)
+ try {
+ ParallelRunner parallelRunner = closer.register(new
ParallelRunner(numThreads, fs));
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new
JobLauncherUtils.WorkUnitPathCalculator();
+ int i = 0;
+ for (WorkUnit workUnit : workUnits) {
+ Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId,
targetDirPath);
+ if (i++ == 0) {
+ log.info("Writing work unit file [{}]: '{}'", i - 1, i == 1 ?
workUnitFile : ("./" + workUnitFile.getName()));
+ }
Review Comment:
This would only print out the first workunit? If so we don't need the
ternary statement as it should also be `workUnitFile`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.api.client.util.Lists;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import io.temporal.failure.ApplicationFailure;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.WorkUnitStreamSource;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+
+
+@Slf4j
+public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
+
+ @Override
+ public int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ // TODO: decide whether to acquire a job lock (as MR did)!
+ // TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
+ JobState jobState = new JobState(jobProps);
+ log.info("Created jobState: {}", jobState.toJsonString(true));
+ Optional<Config> thisClassConfig = WorkerConfig.of(this);
+ log.info("Obtained class config: {}", thisClassConfig.isPresent() ?
thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");
+
+ Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
+ log.info("Using work dir root path for job '{}' - '{}'",
jobState.getJobId(), workDirRoot);
+
+ // TODO: determine whether these are actually necessary to do (as
MR/AbstractJobLauncher did)!
+ // SharedResourcesBroker<GobblinScopeTypes> jobBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ // jobState.setBroker(jobBroker);
+ // jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+
+ try (Closer closer = Closer.create()) {
+ // before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
+ FileSystem fs = JobStateUtils.openFileSystem(jobState);
+ fs.mkdirs(workDirRoot);
+
+ List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState,
eventSubmitterContext, closer);
+
+ JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
+ JobStateUtils.writeJobState(jobState, workDirRoot, fs);
+
+ return workUnits.size();
+ } catch (ReflectiveOperationException roe) {
+ String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
+ log.error(errMsg, roe);
+ throw ApplicationFailure.newNonRetryableFailureWithCause(errMsg,
"Failure: new Source()", roe);
+ } catch (IOException ioe) {
+ String errMsg = "Failed to generate workunits for job " +
jobState.getJobId();
+ log.error(errMsg, ioe);
+ throw ApplicationFailure.newFailureWithCause(errMsg, "Failure:
generating/writing workunits", ioe);
+ } finally {
+ // TODO: implement Troubleshooter integration!
+ }
+ }
+
+ protected static List<WorkUnit> generateWorkUnitsForJobState(JobState
jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
+ throws ReflectiveOperationException {
+ Source<?, ?> source = JobStateUtils.createSource(jobState);
+ WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
+ ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
+ : new
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
+
+ // TODO: report (timer) metrics for workunits creation
+ if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { //
indicates a problem getting the WUs
+ String errMsg = "Failure in getting work units for job " +
jobState.getJobId();
+ log.error(errMsg);
+ // TODO: decide whether a non-retryable failure is too severe... (in
most circumstances, it's likely what we want)
+ throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure:
Source.getWorkUnits()");
+ }
+
+ if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run:
entirely normal result (not a failure)
+ log.warn("No work units created for job " + jobState.getJobId());
+ return Lists.newArrayList();
+ }
+
+ // TODO: count total bytes for progress tracking!
+
+ boolean canCleanUp = canCleanStagingData(jobState);
+ DestinationDatasetHandlerService datasetHandlerService = closer.register(
+ new DestinationDatasetHandlerService(jobState, canCleanUp,
eventSubmitterContext.create()));
+ WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
+
+ // initialize writer and converter(s)
+ // TODO: determine whether registration here is effective, or the
lifecycle of this activity is too brief (as is likely!)
+ closer.register(WriterInitializerFactory.newInstace(jobState,
handledWorkUnitStream)).initialize();
+ closer.register(ConverterInitializerFactory.newInstance(jobState,
handledWorkUnitStream)).initialize();
Review Comment:
I think these would not be useful as the processWorkUnit activity should be
handling the writer initializers and converter initializers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]