[ https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219167#comment-16219167 ]
ASF GitHub Bot commented on HADOOP-14971: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/282#discussion_r146933118 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java --- @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.fs.s3a.commit.Tasks; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import static com.google.common.base.Preconditions.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; + +/** + * Committer based on the contributed work of the + * <a href="https://github.com/rdblue/s3committer">Netflix multipart committers.</a> + * <ol> + * <li> + * The working directory of each task is actually under a temporary + * path in the local filesystem; jobs write directly into it. + * </li> + * <li> + * Task Commit: list all files under the task working dir, upload + * each of them but do not commit the final operation. + * Persist the information for each pending commit into the cluster + * for enumeration and commit by the job committer. + * </li> + * <li>Task Abort: recursive delete of task working dir.</li> + * <li>Job Commit: list all pending PUTs to commit; commit them.</li> + * <li> + * Job Abort: list all pending PUTs to commit; abort them. + * Delete all task attempt directories. + * </li> + * </ol> + */ +public class StagingCommitter extends AbstractS3GuardCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + StagingCommitter.class); + public static final String NAME = "StagingCommitter"; + private final Path constructorOutputPath; + private final long uploadPartSize; + private final String uuid; + private final boolean uniqueFilenames; + private final FileOutputCommitter wrappedCommitter; + + private ConflictResolution conflictResolution; + private final Path finalOutputPath; + private String s3KeyPrefix = null; + + /** The directory in the cluster FS for commits to go to. */ + private Path commitsDirectory; + + /** + * Committer for a single task attempt. + * @param outputPath final output path + * @param context task context + * @throws IOException on a failure + */ + public StagingCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.constructorOutputPath = checkNotNull(getOutputPath(), "output path"); + Configuration conf = getConf(); + this.uploadPartSize = conf.getLongBytes( + MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + + // Spark will use a fake app ID based on the current minute and job ID 0. + // To avoid collisions, use the YARN application ID for Spark. + this.uuid = getUploadUUID(conf, context.getJobID()); + this.uniqueFilenames = conf.getBoolean( + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); + setWorkPath(buildWorkPath(context, uuid)); + this.wrappedCommitter = createWrappedCommitter(context, conf); + // forces evaluation and caching of the resolution mode. + ConflictResolution mode = getConflictResolutionMode(getJobContext()); + LOG.debug("Conflict resolution mode: {}", mode); + this.finalOutputPath = constructorOutputPath; + Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null"); + S3AFileSystem fs = getS3AFileSystem(finalOutputPath, + context.getConfiguration(), false); + s3KeyPrefix = fs.pathToKey(finalOutputPath); + LOG.debug("{}: final output path is {}", getRole(), finalOutputPath); + setOutputPath(finalOutputPath); + } + + @Override + public String getName() { + return NAME; + } + + /** + * Create the wrapped committer. + * This includes customizing its options, and setting up the destination + * directory. + * @param context job/task context. + * @param conf config + * @return the inner committer + * @throws IOException on a failure + */ + protected FileOutputCommitter createWrappedCommitter(JobContext context, + Configuration conf) throws IOException { + + // explicitly choose commit algorithm + initFileOutputCommitterOptions(context); + commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid); + FileSystem stagingFS = commitsDirectory.getFileSystem(conf); + Path qualified = stagingFS.makeQualified(commitsDirectory); + if (stagingFS instanceof S3AFileSystem) { + // currently refuse to work with S3a for the working FS; you need + // a consistent FS. This isn't entirely true with s3guard and + // alternative S3 endpoints, but doing it now stops + // accidental use of S3 + throw new PathIOException(qualified.toUri().toString(), + "Directory for intermediate work cannot be on S3"); + } + return new FileOutputCommitter(qualified, context); + } + + /** + * Init the context config with everything needed for the file output + * committer. In particular, this code currently only works with + * commit algorithm 1. + * @param context context to configure. + */ + protected void initFileOutputCommitterOptions(JobContext context) { + context.getConfiguration() + .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("StagingCommitter{"); + sb.append(super.toString()); + sb.append(", finalOutputPath=").append(finalOutputPath); + sb.append(", conflictResolution=").append(conflictResolution); + if (wrappedCommitter != null) { + sb.append(", wrappedCommitter=").append(wrappedCommitter); + } + sb.append('}'); + return sb.toString(); + } + + /** + * Get the UUID of an upload; may be the job ID. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, String jobId) { + return conf.getTrimmed(FS_S3A_COMMITTER_STAGING_UUID, + conf.get(SPARK_WRITE_UUID, + conf.getTrimmed(SPARK_APP_ID, jobId))); + } + + /** + * Get the UUID of an upload; may be the job ID. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, JobID jobId) { + return getUploadUUID(conf, jobId.toString()); + } + + /** + * Get the work path for a task. + * @param context job/task complex + * @param uuid UUID + * @return a path or null if the context is not of a task + * @throws IOException failure to build the path + */ + private static Path buildWorkPath(JobContext context, String uuid) + throws IOException { + if (context instanceof TaskAttemptContext) { + return taskAttemptWorkingPath((TaskAttemptContext) context, uuid); + } else { + return null; + } + } + + /** + * The staging committers do not require "magic" commit support. + * @return false + */ + @Override + protected boolean isMagicFileSystemRequired() { --- End diff -- I was trying really hard to think up a good name here. I will change it in this committer but use it elsewhere as "supportsPendingWritesToSpecialPaths" is just too nonintuitive > Merge S3A committers into trunk > ------------------------------- > > Key: HADOOP-14971 > URL: https://issues.apache.org/jira/browse/HADOOP-14971 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.0.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > > Merge the HADOOP-13786 committer into trunk. This branch is being set up as a > github PR for review there & to keep it out the mailboxes of the watchers on > the main JIRA -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org