phet commented on code in PR #3880: URL: https://github.com/apache/gobblin/pull/3880#discussion_r1501201188
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import com.google.api.client.util.Lists; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import io.temporal.failure.ApplicationFailure; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.commit.DeliverySemantics; +import org.apache.gobblin.converter.initializer.ConverterInitializerFactory; +import org.apache.gobblin.destination.DestinationDatasetHandlerService; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.source.WorkUnitStreamSource; +import org.apache.gobblin.source.workunit.BasicWorkUnitStream; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.apache.gobblin.temporal.cluster.WorkerConfig; +import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; +import org.apache.gobblin.writer.initializer.WriterInitializerFactory; + + +@Slf4j +public class GenerateWorkUnitsImpl implements GenerateWorkUnits { + + @Override + public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) { + // TODO: decide whether to acquire a job lock (as MR did)! + // TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)! + JobState jobState = new JobState(jobProps); + log.info("Created jobState: {}", jobState.toJsonString(true)); + Optional<Config> thisClassConfig = WorkerConfig.of(this); + log.info("Obtained class config: {}", thisClassConfig.isPresent() ? thisClassConfig.get() : "NO WORKER CONFIG: ERROR!"); + + Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState); + log.info("Using work dir root path for job '{}' - '{}'", jobState.getJobId(), workDirRoot); + + // TODO: determine whether these are actually necessary to do (as MR/AbstractJobLauncher did)! + // SharedResourcesBroker<GobblinScopeTypes> jobBroker = JobStateUtils.getSharedResourcesBroker(jobState); + // jobState.setBroker(jobBroker); + // jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName)); + + try (Closer closer = Closer.create()) { + // before embarking on (potentially expensive) WU creation, first pre-check that the FS is available + FileSystem fs = JobStateUtils.openFileSystem(jobState); + fs.mkdirs(workDirRoot); + + List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, eventSubmitterContext, closer); + + JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs); + JobStateUtils.writeJobState(jobState, workDirRoot, fs); + + return workUnits.size(); + } catch (ReflectiveOperationException roe) { + String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId(); + log.error(errMsg, roe); + throw ApplicationFailure.newNonRetryableFailureWithCause(errMsg, "Failure: new Source()", roe); + } catch (IOException ioe) { + String errMsg = "Failed to generate workunits for job " + jobState.getJobId(); + log.error(errMsg, ioe); + throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: generating/writing workunits", ioe); + } finally { + // TODO: implement Troubleshooter integration! + } + } + + protected static List<WorkUnit> generateWorkUnitsForJobState(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer) + throws ReflectiveOperationException { + Source<?, ?> source = JobStateUtils.createSource(jobState); + WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource + ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState) + : new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build(); + + // TODO: report (timer) metrics for workunits creation + if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs + String errMsg = "Failure in getting work units for job " + jobState.getJobId(); + log.error(errMsg); + // TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want) + throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()"); + } + + if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure) + log.warn("No work units created for job " + jobState.getJobId()); + return Lists.newArrayList(); + } + + // TODO: count total bytes for progress tracking! + + boolean canCleanUp = canCleanStagingData(jobState); + DestinationDatasetHandlerService datasetHandlerService = closer.register( + new DestinationDatasetHandlerService(jobState, canCleanUp, eventSubmitterContext.create())); + WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream); + + // initialize writer and converter(s) + // TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!) + closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize(); + closer.register(ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream)).initialize(); Review Comment: I just wasn't sure, because these take a `WorkUnitStream` param, but the subsequent "work fulfillment" processing wouldn't have one of those around--only "work discovery" does. could these be adding props to the WUs before they're serialized and persisted? another data point is that the `ProcessWorkUnitImpl` activity seems to work just fine even though it doesn't reference these factories. I haven't had a chance to look into this yet... do you know for instance under what circumstances they're absolutely essential to successful operation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
