steveloughran commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r813262444
########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java ########## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks; +import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter; + +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterExit; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterEntry; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.cleanupStageOptionsFromConfig; + +/** + * This is the Intermediate-Manifest committer. + * At every entry point it updates the thread's audit context with + * the current stage info; this is a placeholder for + * adding audit information to stores other than S3A. + * + * This is tagged as public/stable. This is mandatory + * for the classname and PathOutputCommitter implementation + * classes. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ManifestCommitter extends PathOutputCommitter implements + IOStatisticsSource, StageEventCallbacks { + + public static final Logger LOG = LoggerFactory.getLogger( + ManifestCommitter.class); + + /** + * Role: task committer. + */ + public static final String TASK_COMMITTER = "task committer"; + + /** + * Role: job committer. + */ + public static final String JOB_COMMITTER = "job committer"; + + /** + * Committer Configuration as extracted from + * the job/task context and set in the constructor. + */ + private final ManifestCommitterConfig baseConfig; + + /** + * Destination of the job. + */ + private final Path destinationDir; + + /** + * For tasks, the attempt directory. + * Null for jobs. + */ + private final Path taskAttemptDir; + + /** + * IOStatistics to update. + */ + private final IOStatisticsStore iostatistics; + + /** + * The job Manifest Success data; only valid after a job successfully + * commits. + */ + private ManifestSuccessData successReport; + + /** + * The active stage; is updated by a callback from within the stages. + */ + private String activeStage; + + /** + * The task manifest of the task commit. + * Null unless this is a task attempt and the + * task has successfully been committed. + */ + private TaskManifest taskAttemptCommittedManifest; + + /** + * Create a committer. + * @param outputPath output path + * @param context job/task context + * @throws IOException failure. + */ + ManifestCommitter(final Path outputPath, + final TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.destinationDir = resolveDestinationDirectory(outputPath, + context.getConfiguration()); + this.iostatistics = createIOStatisticsStore().build(); + this.baseConfig = enterCommitter( + context.getTaskAttemptID() != null, + context); + + this.taskAttemptDir = baseConfig.getTaskAttemptDir(); + LOG.info("Created ManifestCommitter with JobID {}," + + " Task Attempt {} and destination {}", + context.getJobID(), context.getTaskAttemptID(), outputPath); + } + + /** + * Committer method invoked; generates a config for it. + * Calls {@code #updateCommonContextOnCommitterEntry()} + * to update the audit context. + * @param isTask is this a task entry point? + * @param context context + * @return committer config + */ + private ManifestCommitterConfig enterCommitter(boolean isTask, + JobContext context) { + ManifestCommitterConfig committerConfig = + new ManifestCommitterConfig( + getOutputPath(), + isTask ? TASK_COMMITTER : JOB_COMMITTER, + context, + iostatistics, + this); + updateCommonContextOnCommitterEntry(committerConfig); + return committerConfig; + } + + /** + * Set up a job through a {@link SetupJobStage}. + * @param jobContext Context of the job whose output is being written. + * @throws IOException IO Failure. + */ + public void setupJob(final JobContext jobContext) throws IOException { + ManifestCommitterConfig committerConfig = enterCommitter(false, + jobContext); + StageConfig stageConfig = + committerConfig + .createJobStageConfig() + .withOperations(createStoreOperations()) + .build(); + // set up the job. + new SetupJobStage(stageConfig) + .apply(committerConfig.getCreateJobMarker()); + logCommitterStatisticsAtDebug(); + } + + /** + * Set up a task through a {@link SetupTaskStage}. + * Classic FileOutputCommitter is a no-op here, relying + * on RecordWriters to create the dir implicitly on file + * create(). + * FileOutputCommitter also uses the existence of that + * file as a flag to indicate task commit is needed. + * @param context task context. + * @throws IOException IO Failure. + */ + public void setupTask(final TaskAttemptContext context) Review comment: noted, will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org