[ 
https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219223#comment-16219223
 ] 

ASF GitHub Bot commented on HADOOP-14971:
-----------------------------------------

Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/282#discussion_r146942624
  
    --- Diff: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 ---
    @@ -0,0 +1,908 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.fs.s3a.commit.staging;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +
    +import com.google.common.base.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.PathIOException;
    +import org.apache.hadoop.fs.s3a.S3AFileSystem;
    +import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter;
    +import org.apache.hadoop.fs.s3a.commit.CommitConstants;
    +import org.apache.hadoop.fs.s3a.commit.DurationInfo;
    +import org.apache.hadoop.fs.s3a.commit.Tasks;
    +import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
    +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +import static com.google.common.base.Preconditions.*;
    +import static org.apache.hadoop.fs.s3a.Constants.*;
    +import static org.apache.hadoop.fs.s3a.S3AUtils.*;
    +import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
    +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
    +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
    +
    +/**
    + * Committer based on the contributed work of the
    + * <a href="https://github.com/rdblue/s3committer";>Netflix multipart 
committers.</a>
    + * <ol>
    + *   <li>
    + *   The working directory of each task is actually under a temporary
    + *   path in the local filesystem; jobs write directly into it.
    + *   </li>
    + *   <li>
    + *     Task Commit: list all files under the task working dir, upload
    + *     each of them but do not commit the final operation.
    + *     Persist the information for each pending commit into the cluster
    + *     for enumeration and commit by the job committer.
    + *   </li>
    + *   <li>Task Abort: recursive delete of task working dir.</li>
    + *   <li>Job Commit: list all pending PUTs to commit; commit them.</li>
    + *   <li>
    + *     Job Abort: list all pending PUTs to commit; abort them.
    + *     Delete all task attempt directories.
    + *   </li>
    + * </ol>
    + */
    +public class StagingCommitter extends AbstractS3GuardCommitter {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(
    +      StagingCommitter.class);
    +  public static final String NAME = "StagingCommitter";
    +  private final Path constructorOutputPath;
    +  private final long uploadPartSize;
    +  private final String uuid;
    +  private final boolean uniqueFilenames;
    +  private final FileOutputCommitter wrappedCommitter;
    +
    +  private ConflictResolution conflictResolution;
    +  private final Path finalOutputPath;
    +  private String s3KeyPrefix = null;
    +
    +  /** The directory in the cluster FS for commits to go to. */
    +  private Path commitsDirectory;
    +
    +  /**
    +   * Committer for a single task attempt.
    +   * @param outputPath final output path
    +   * @param context task context
    +   * @throws IOException on a failure
    +   */
    +  public StagingCommitter(Path outputPath,
    +      TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +    this.constructorOutputPath = checkNotNull(getOutputPath(), "output 
path");
    +    Configuration conf = getConf();
    +    this.uploadPartSize = conf.getLongBytes(
    +        MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
    +
    +    // Spark will use a fake app ID based on the current minute and job ID 
0.
    +    // To avoid collisions, use the YARN application ID for Spark.
    +    this.uuid = getUploadUUID(conf, context.getJobID());
    +    this.uniqueFilenames = conf.getBoolean(
    +        FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
    +        DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
    +    setWorkPath(buildWorkPath(context, uuid));
    +    this.wrappedCommitter = createWrappedCommitter(context, conf);
    +    // forces evaluation and caching of the resolution mode.
    +    ConflictResolution mode = getConflictResolutionMode(getJobContext());
    +    LOG.debug("Conflict resolution mode: {}", mode);
    +    this.finalOutputPath = constructorOutputPath;
    +    Preconditions.checkNotNull(finalOutputPath, "Output path cannot be 
null");
    +    S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
    +        context.getConfiguration(), false);
    +    s3KeyPrefix = fs.pathToKey(finalOutputPath);
    +    LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
    +    setOutputPath(finalOutputPath);
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return NAME;
    +  }
    +
    +  /**
    +   * Create the wrapped committer.
    +   * This includes customizing its options, and setting up the destination
    +   * directory.
    +   * @param context job/task context.
    +   * @param conf config
    +   * @return the inner committer
    +   * @throws IOException on a failure
    +   */
    +  protected FileOutputCommitter createWrappedCommitter(JobContext context,
    +      Configuration conf) throws IOException {
    +
    +    // explicitly choose commit algorithm
    +    initFileOutputCommitterOptions(context);
    +    commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, 
uuid);
    +    FileSystem stagingFS = commitsDirectory.getFileSystem(conf);
    +    Path qualified = stagingFS.makeQualified(commitsDirectory);
    +    if (stagingFS instanceof S3AFileSystem) {
    +      // currently refuse to work with S3a for the working FS; you need
    +      // a consistent FS. This isn't entirely true with s3guard and
    +      // alternative S3 endpoints, but doing it now stops
    +      // accidental use of S3
    +      throw new PathIOException(qualified.toUri().toString(),
    +          "Directory for intermediate work cannot be on S3");
    +    }
    +    return new FileOutputCommitter(qualified, context);
    +  }
    +
    +  /**
    +   * Init the context config with everything needed for the file output
    +   * committer. In particular, this code currently only works with
    +   * commit algorithm 1.
    +   * @param context context to configure.
    +   */
    +  protected void initFileOutputCommitterOptions(JobContext context) {
    +    context.getConfiguration()
    +        .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 
1);
    +  }
    +
    +  @Override
    +  public String toString() {
    +    final StringBuilder sb = new StringBuilder("StagingCommitter{");
    +    sb.append(super.toString());
    +    sb.append(", finalOutputPath=").append(finalOutputPath);
    +    sb.append(", conflictResolution=").append(conflictResolution);
    +    if (wrappedCommitter != null) {
    +      sb.append(", wrappedCommitter=").append(wrappedCommitter);
    +    }
    +    sb.append('}');
    +    return sb.toString();
    +  }
    +
    +  /**
    +   * Get the UUID of an upload; may be the job ID.
    +   * @param conf job/task configuration
    +   * @param jobId Job ID
    +   * @return an ID for use in paths.
    +   */
    +  public static String getUploadUUID(Configuration conf, String jobId) {
    +    return conf.getTrimmed(FS_S3A_COMMITTER_STAGING_UUID,
    +        conf.get(SPARK_WRITE_UUID,
    +            conf.getTrimmed(SPARK_APP_ID, jobId)));
    +  }
    +
    +  /**
    +   * Get the UUID of an upload; may be the job ID.
    +   * @param conf job/task configuration
    +   * @param jobId Job ID
    +   * @return an ID for use in paths.
    +   */
    +  public static String getUploadUUID(Configuration conf, JobID jobId) {
    +    return getUploadUUID(conf, jobId.toString());
    +  }
    +
    +  /**
    +   * Get the work path for a task.
    +   * @param context job/task complex
    +   * @param uuid UUID
    +   * @return a path or null if the context is not of a task
    +   * @throws IOException failure to build the path
    +   */
    +  private static Path buildWorkPath(JobContext context, String uuid)
    +      throws IOException {
    +    if (context instanceof TaskAttemptContext) {
    +      return taskAttemptWorkingPath((TaskAttemptContext) context, uuid);
    +    } else {
    +      return null;
    +    }
    +  }
    +
    +  /**
    +   * The staging committers do not require "magic" commit support.
    +   * @return false
    +   */
    +  @Override
    +  protected boolean isMagicFileSystemRequired() {
    +    return false;
    +  }
    +
    +  /**
    +   * Is this committer using unique filenames?
    +   * @return true if unique filenames are used.
    +   */
    +  public Boolean useUniqueFilenames() {
    +    return uniqueFilenames;
    +  }
    +
    +  /**
    +   * Get the filesystem for the job attempt.
    +   * @param context the context of the job.  This is used to get the
    +   * application attempt ID.
    +   * @return the FS to store job attempt data.
    +   * @throws IOException failure to create the FS.
    +   */
    +  public FileSystem getJobAttemptFileSystem(JobContext context)
    +      throws IOException {
    +    Path p = getJobAttemptPath(context);
    +    return p.getFileSystem(context.getConfiguration());
    +  }
    +
    +  /**
    +   * Compute the path where the output of a given job attempt will be 
placed.
    +   * @param context the context of the job.  This is used to get the
    +   * application attempt ID.
    +   * @param out the output path to place these in.
    +   * @return the path to store job attempt data.
    +   */
    +  public static Path getJobAttemptPath(JobContext context, Path out) {
    +    return getJobAttemptPath(getAppAttemptId(context), out);
    +  }
    +
    +  /**
    +   * Compute the path where the output of a given job attempt will be 
placed.
    +   * @param appAttemptId the ID of the application attempt for this job.
    +   * @return the path to store job attempt data.
    +   */
    +  private static Path getJobAttemptPath(int appAttemptId, Path out) {
    +    return new Path(getPendingJobAttemptsPath(out),
    +        String.valueOf(appAttemptId));
    +  }
    +
    +  @Override
    +  protected Path getJobAttemptPath(int appAttemptId) {
    +    return new Path(getPendingJobAttemptsPath(commitsDirectory),
    +        String.valueOf(appAttemptId));
    +  }
    +
    +  /**
    +   * Compute the path where the output of pending task attempts are stored.
    +   * @param context the context of the job with pending tasks.
    +   * @return the path where the output of pending task attempts are stored.
    +   */
    +  private static Path getPendingTaskAttemptsPath(JobContext context, Path 
out) {
    +    return new Path(getJobAttemptPath(context, out),
    +        TEMPORARY);
    +  }
    +
    +  /**
    +   * Compute the path where the output of a task attempt is stored until
    +   * that task is committed.
    +   *
    +   * @param context the context of the task attempt.
    +   * @param out The output path to put things in.
    +   * @return the path where a task attempt should be stored.
    +   */
    +  public static Path getTaskAttemptPath(TaskAttemptContext context, Path 
out) {
    +    return new Path(getPendingTaskAttemptsPath(context, out),
    +        String.valueOf(context.getTaskAttemptID()));
    +  }
    +
    +  /**
    +   * Get the location of pending job attempts.
    +   * @param out the base output directory.
    +   * @return the location of pending job attempts.
    +   */
    +  private static Path getPendingJobAttemptsPath(Path out) {
    +    Preconditions.checkNotNull(out, "Null 'out' path");
    +    return new Path(out, TEMPORARY);
    +  }
    +
    +  /**
    +   * Compute the path where the output of a committed task is stored until
    +   * the entire job is committed.
    +   * @param context the context of the task attempt
    +   * @return the path where the output of a committed task is stored until
    +   * the entire job is committed.
    +   */
    +  public Path getCommittedTaskPath(TaskAttemptContext context) {
    +    return getCommittedTaskPath(getAppAttemptId(context), context);
    +  }
    +
    +  /**
    +   * Validate the task attempt context; makes sure
    +   * that the task attempt ID data is valid.
    +   * @param context task context
    +   */
    +  private static void validateContext(TaskAttemptContext context) {
    +    Preconditions.checkNotNull(context, "null context");
    +    Preconditions.checkNotNull(context.getTaskAttemptID(),
    +        "null task attempt ID");
    +    Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(),
    +        "null task ID");
    +    Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(),
    +        "null job ID");
    +  }
    +
    +  /**
    +   * Compute the path where the output of a committed task is stored until 
the
    +   * entire job is committed for a specific application attempt.
    +   * @param appAttemptId the ID of the application attempt to use
    +   * @param context the context of any task.
    +   * @return the path where the output of a committed task is stored.
    +   */
    +  protected Path getCommittedTaskPath(int appAttemptId,
    +      TaskAttemptContext context) {
    +    validateContext(context);
    +    return new Path(getJobAttemptPath(appAttemptId),
    +        String.valueOf(context.getTaskAttemptID().getTaskID()));
    +  }
    +
    +  @Override
    +  public Path getTempTaskAttemptPath(TaskAttemptContext context) {
    +    throw new UnsupportedOperationException("Unimplemented");
    +  }
    +
    +  /**
    +   * Lists the output of a task under the task attempt path. Subclasses can
    +   * override this method to change how output files are identified.
    +   * <p>
    +   * This implementation lists the files that are direct children of the 
output
    +   * path and filters hidden files (file names starting with '.' or '_').
    +   * <p>
    +   * The task attempt path is provided by
    +   * {@link #getTaskAttemptPath(TaskAttemptContext)}
    +   *
    +   * @param context this task's {@link TaskAttemptContext}
    +   * @return the output files produced by this task in the task attempt 
path
    +   * @throws IOException on a failure
    +   */
    +  protected List<FileStatus> getTaskOutput(TaskAttemptContext context)
    +      throws IOException {
    +    PathFilter filter = Paths.HiddenPathFilter.get();
    +
    +    // get files on the local FS in the attempt path
    +    Path attemptPath = getTaskAttemptPath(context);
    +    Preconditions.checkNotNull(attemptPath,
    +        "No attemptPath path in {}", this);
    +
    +    LOG.debug("Scanning {} for files to commit", attemptPath);
    +
    +    return flatmapLocatedFiles(
    +        getTaskAttemptFilesystem(context)
    +            .listFiles(attemptPath, true),
    +        s -> maybe(filter.accept(s.getPath()), s));
    +  }
    +
    +  /**
    +   * Returns the final S3 key for a relative path. Subclasses can override 
this
    +   * method to upload files to a different S3 location.
    +   * <p>
    +   * This implementation concatenates the relative path with the key prefix
    +   * from the output path.
    +   * If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} 
is
    +   * set, then the task UUID is also included in the calculation
    +   *
    +   * @param relative the path of a file relative to the task attempt path
    +   * @param context the JobContext or TaskAttemptContext for this job
    +   * @return the S3 key where the file will be uploaded
    +   * @throws IOException on a failure
    +   */
    +  protected String getFinalKey(String relative, JobContext context)
    +      throws IOException {
    +    if (uniqueFilenames) {
    +      return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid);
    +    } else {
    +      return getS3KeyPrefix(context) + "/" + relative;
    +    }
    +  }
    +
    +  /**
    +   * Returns the final S3 location for a relative path as a Hadoop {@link 
Path}.
    +   * This is a final method that calls {@link #getFinalKey(String, 
JobContext)}
    +   * to determine the final location.
    +   *
    +   * @param relative the path of a file relative to the task attempt path
    +   * @param context the JobContext or TaskAttemptContext for this job
    +   * @return the S3 Path where the file will be uploaded
    +   * @throws IOException on a failure
    +   */
    +  protected final Path getFinalPath(String relative, JobContext context)
    +      throws IOException {
    +    return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, 
context));
    +  }
    +
    +  /**
    +   * Return the local work path as the destination for writing work.
    +   * @param context the context of the task attempt.
    +   * @return a path in the local filesystem.
    +   */
    +  @Override
    +  public Path getBaseTaskAttemptPath(TaskAttemptContext context) {
    +    // a path on the local FS for files that will be uploaded
    +    return getWorkPath();
    +  }
    +
    +  /**
    +   * For a job attempt path, the staging committer returns that of the
    +   * wrapped committer.
    +   * @param context the context of the job.
    +   * @return a path in HDFS.
    +   */
    +  @Override
    +  public Path getJobAttemptPath(JobContext context) {
    +    return wrappedCommitter.getJobAttemptPath(context);
    +  }
    +
    +  /**
    +   * Set up the job, including calling the same method on the
    +   * wrapped committer.
    +   * @param context job context
    +   * @throws IOException IO failure.
    +   */
    +  @Override
    +  public void setupJob(JobContext context) throws IOException {
    +    LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context));
    +    context.getConfiguration().set(FS_S3A_COMMITTER_STAGING_UUID, uuid);
    +    wrappedCommitter.setupJob(context);
    +  }
    +
    +  /**
    +   * Get the list of pending uploads for this job attempt.
    +   * @param context job context
    +   * @return a list of pending uploads.
    +   * @throws IOException Any IO failure
    +   */
    +  protected List<SinglePendingCommit> getPendingUploadsToCommit(
    +      JobContext context)
    +      throws IOException {
    +    return listPendingUploads(context, false);
    +  }
    +
    +  /**
    +   * Get the list of pending uploads for this job attempt, swallowing
    +   * exceptions.
    +   * @param context job context
    +   * @return a list of pending uploads. If an exception was swallowed,
    +   * then this may not match the actual set of pending operations
    +   * @throws IOException shouldn't be raised, but retained for the compiler
    +   */
    +  protected List<SinglePendingCommit> listPendingUploadsToAbort(
    +      JobContext context) throws IOException {
    +    return listPendingUploads(context, true);
    +  }
    +
    +  /**
    +   * Get the list of pending uploads for this job attempt.
    +   * @param context job context
    +   * @param suppressExceptions should exceptions be swallowed?
    +   * @return a list of pending uploads. If exceptions are being swallowed,
    +   * then this may not match the actual set of pending operations
    +   * @throws IOException Any IO failure which wasn't swallowed.
    +   */
    +  protected List<SinglePendingCommit> listPendingUploads(
    +      JobContext context, boolean suppressExceptions) throws IOException {
    +    Path jobAttemptPath = wrappedCommitter.getJobAttemptPath(context);
    +    final FileSystem attemptFS = jobAttemptPath.getFileSystem(
    +        context.getConfiguration());
    +    FileStatus[] pendingCommitFiles;
    +    try {
    +      pendingCommitFiles = attemptFS.listStatus(
    +          jobAttemptPath, Paths.HiddenPathFilter.get());
    +    } catch (FileNotFoundException e) {
    +      // file is not present, raise without bothering to report
    +      throw e;
    +    } catch (IOException e) {
    +      // unable to work with endpoint, if suppressing errors decide our 
actions
    +      if (suppressExceptions) {
    +        LOG.info("{} failed to list pending upload dir", getRole(), e);
    +        return new ArrayList<>(0);
    +      } else {
    +        throw e;
    +      }
    +    }
    +    return loadMultiplePendingCommitFiles(context,
    +        suppressExceptions, attemptFS, pendingCommitFiles);
    +  }
    +
    +  /**
    +   * Commit work.
    +   * This consists of two stages: precommit and commit.
    +   * <p>
    +   * Precommit: identify pending uploads, then allow subclasses
    +   * to validate the state of the destination and the pending uploads.
    +   * Any failure here triggers an abort of all pending uploads.
    +   * <p>
    +   * Commit internal: do the final commit sequence.
    +   * <p>
    +   * The final commit action is to build the {@code __SUCCESS} file entry.
    +   * </p>
    +   * @param context job context
    +   * @throws IOException any failure
    +   */
    +  @Override
    +  public void commitJob(JobContext context) throws IOException {
    +    List<SinglePendingCommit> pending = Collections.emptyList();
    +    try (DurationInfo d = new DurationInfo(LOG,
    +        "%s: preparing to commit Job", getRole())) {
    +      pending = getPendingUploadsToCommit(context);
    +      preCommitJob(context, pending);
    +    } catch (IOException e) {
    +      LOG.warn("Precommit failure for job {}", jobIdString(context), e);
    +      abortJobInternal(context, pending, true);
    +      getCommitOperations().jobCompleted(false);
    +      throw e;
    +    }
    +    try (DurationInfo d = new DurationInfo(LOG,
    +        "%s: committing Job %s", getRole(), jobIdString(context))) {
    +      commitJobInternal(context, pending);
    +    } catch (IOException e) {
    +      getCommitOperations().jobCompleted(false);
    +      throw e;
    +    }
    +    getCommitOperations().jobCompleted(true);
    +    maybeCreateSuccessMarkerFromCommits(context, pending);
    +  }
    +
    +  /**
    +   * Subclass-specific pre commit actions.
    +   * @param context job context
    +   * @param pending the pending operations
    +   * @throws IOException any failure
    +   */
    +  protected void preCommitJob(JobContext context,
    +      List<SinglePendingCommit> pending) throws IOException {
    +  }
    +
    +  @Override
    +  public void cleanupStagingDirs() throws IOException {
    +    Path workPath = getWorkPath();
    +    if (workPath != null) {
    +      LOG.debug("Cleaning up work path {}", workPath);
    +      deleteQuietly(workPath.getFileSystem(getConf()),
    +          workPath, true);
    +    }
    +  }
    +
    +  /**
    +   * Cleanup includes: deleting job attempt pending paths,
    +   * local staging directories, and the directory of the wrapped committer.
    +   * @param context job context
    +   * @param suppressExceptions should exceptions be suppressed?
    +   * @throws IOException IO failure.
    +   */
    +  @Override
    +  @SuppressWarnings("deprecation")
    +  protected void cleanup(JobContext context, boolean suppressExceptions)
    +      throws IOException {
    +    try {
    +      wrappedCommitter.cleanupJob(context);
    +      deleteDestinationPaths(context);
    +      cleanupStagingDirs();
    +    } catch (IOException e) {
    +      if (suppressExceptions) {
    +        LOG.error("{}: failed while cleaning up job", getRole(), e);
    +      } else {
    +        throw e;
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Delete the destination paths of a job.
    +   * @param context job context
    +   * @throws IOException IO failure
    +   */
    +  protected void deleteDestinationPaths(JobContext context) throws 
IOException {
    +    try {
    +      deleteWithWarning(getJobAttemptFileSystem(context),
    +          getJobAttemptPath(context), true);
    +    } catch (IOException e) {
    +      LOG.debug("{}: delete failure", getRole(), e);
    +    }
    +
    +    // delete the __temporary directory. This will cause problems
    +    // if there is >1 task targeting the same dest dir
    +    deleteWithWarning(getDestFS(),
    +        new Path(getFinalOutputPath(), TEMPORARY),
    +        true);
    +    // and the working path
    +    deleteTaskWorkingPathQuietly(context);
    +  }
    +
    +
    +  @Override
    +  public void setupTask(TaskAttemptContext context) throws IOException {
    +    Path taskAttemptPath = getTaskAttemptPath(context);
    +    try (DurationInfo d = new DurationInfo(LOG,
    +        "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
    +      // create the local FS
    +      taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
    +      wrappedCommitter.setupTask(context);
    +    }
    +  }
    +
    +  @Override
    +  public boolean needsTaskCommit(TaskAttemptContext context)
    +      throws IOException {
    +    try (DurationInfo d = new DurationInfo(LOG,
    +        "%s: needsTaskCommit() Task %s", getRole(), 
context.getTaskAttemptID())) {
    +      // check for files on the local FS in the attempt path
    +      Path attemptPath = getTaskAttemptPath(context);
    +      FileSystem fs = getTaskAttemptFilesystem(context);
    +
    +      // This could be made more efficient with a probe "hasChildren(Path)"
    +      // which returns true if there is >1 entry under a given path.
    +      FileStatus[] stats = fs.listStatus(attemptPath);
    +      LOG.debug("{} files to commit under {}", stats.length, attemptPath);
    +      return stats.length > 0;
    +    } catch (FileNotFoundException e) {
    +      // list didn't find a directory, so nothing to commit
    +      // TODO: throw this up as an error?
    +      LOG.info("No files to commit");
    +      throw e;
    +    }
    +  }
    +
    +  @Override
    +  public void commitTask(TaskAttemptContext context) throws IOException {
    +    try (DurationInfo d = new DurationInfo(LOG,
    +        "%s: commit task %s", getRole(), context.getTaskAttemptID())) {
    +      int count = commitTaskInternal(context, getTaskOutput(context));
    +      LOG.info("{}: upload file count: {}", getRole(), count);
    +    } catch (IOException e) {
    +      LOG.error("{}: commit of task {} failed",
    +          getRole(), context.getTaskAttemptID(), e);
    +      getCommitOperations().taskCompleted(false);
    +      throw e;
    +    }
    +    getCommitOperations().taskCompleted(true);
    +  }
    +
    +  /**
    +   * Commit the task by uploading all created files and then
    +   * writing a pending entry for them.
    +   * @param context task context
    +   * @param taskOutput list of files from the output
    +   * @return number of uploads committed.
    +   * @throws IOException IO Failures.
    +   */
    +  protected int commitTaskInternal(final TaskAttemptContext context,
    +      List<FileStatus> taskOutput)
    +      throws IOException {
    +    LOG.debug("{}: commitTaskInternal", getRole());
    +    Configuration conf = context.getConfiguration();
    +
    +    final Path attemptPath = getTaskAttemptPath(context);
    +    FileSystem attemptFS = getTaskAttemptFilesystem(context);
    +    LOG.debug("{}: attempt path is {}", getRole(), attemptPath);
    +
    +    // add the commits file to the wrapped committer's task attempt 
location.
    +    // of this method.
    +    Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context);
    +    FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf);
    +
    +    // keep track of unfinished commits in case one fails. if something 
fails,
    +    // we will try to abort the ones that had already succeeded.
    +    int commitCount = taskOutput.size();
    +    final Queue<SinglePendingCommit> commits = new 
ConcurrentLinkedQueue<>();
    +    LOG.info("{}: uploading from staging directory to S3", getRole());
    +    LOG.info("{}: Saving pending data information to {}",
    +        getRole(), commitsAttemptPath);
    +    if (taskOutput.isEmpty()) {
    +      // there is nothing to write. needsTaskCommit() should have caught
    +      // this, so warn that there is some kind of problem in the protocol.
    +      LOG.warn("{}: No files to commit", getRole());
    +    } else {
    +      boolean threw = true;
    +      // before the uploads, report some progress
    +      context.progress();
    +
    +      PendingSet pendingCommits = new PendingSet(commitCount);
    +      try {
    +        Tasks.foreach(taskOutput)
    --- End diff --
    
    I am going to steer clear of it for now, as there is retry logic deeper in 
the commit options and FS itself which tries to disguishable between retryable 
and unrecoverable failures (ConnectionRefused vs UnknownHost, ...); it's 
happening further down. But I could life this code (and whatever setsup the 
stopRetryException list if you think it should all go together


> Merge S3A committers into trunk
> -------------------------------
>
>                 Key: HADOOP-14971
>                 URL: https://issues.apache.org/jira/browse/HADOOP-14971
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.0.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>
> Merge the HADOOP-13786 committer into trunk. This branch is being set up as a 
> github PR for review there & to keep it out the mailboxes of the watchers on 
> the main JIRA



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to