[ https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219182#comment-16219182 ]
ASF GitHub Bot commented on HADOOP-14971: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/282#discussion_r146936337 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java --- @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.fs.s3a.commit.Tasks; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import static com.google.common.base.Preconditions.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; + +/** + * Committer based on the contributed work of the + * <a href="https://github.com/rdblue/s3committer">Netflix multipart committers.</a> + * <ol> + * <li> + * The working directory of each task is actually under a temporary + * path in the local filesystem; jobs write directly into it. + * </li> + * <li> + * Task Commit: list all files under the task working dir, upload + * each of them but do not commit the final operation. + * Persist the information for each pending commit into the cluster + * for enumeration and commit by the job committer. + * </li> + * <li>Task Abort: recursive delete of task working dir.</li> + * <li>Job Commit: list all pending PUTs to commit; commit them.</li> + * <li> + * Job Abort: list all pending PUTs to commit; abort them. + * Delete all task attempt directories. + * </li> + * </ol> + */ +public class StagingCommitter extends AbstractS3GuardCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + StagingCommitter.class); + public static final String NAME = "StagingCommitter"; + private final Path constructorOutputPath; + private final long uploadPartSize; + private final String uuid; + private final boolean uniqueFilenames; + private final FileOutputCommitter wrappedCommitter; + + private ConflictResolution conflictResolution; + private final Path finalOutputPath; + private String s3KeyPrefix = null; + + /** The directory in the cluster FS for commits to go to. */ + private Path commitsDirectory; + + /** + * Committer for a single task attempt. + * @param outputPath final output path + * @param context task context + * @throws IOException on a failure + */ + public StagingCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.constructorOutputPath = checkNotNull(getOutputPath(), "output path"); + Configuration conf = getConf(); + this.uploadPartSize = conf.getLongBytes( + MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + + // Spark will use a fake app ID based on the current minute and job ID 0. + // To avoid collisions, use the YARN application ID for Spark. + this.uuid = getUploadUUID(conf, context.getJobID()); + this.uniqueFilenames = conf.getBoolean( + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); + setWorkPath(buildWorkPath(context, uuid)); + this.wrappedCommitter = createWrappedCommitter(context, conf); + // forces evaluation and caching of the resolution mode. + ConflictResolution mode = getConflictResolutionMode(getJobContext()); + LOG.debug("Conflict resolution mode: {}", mode); + this.finalOutputPath = constructorOutputPath; --- End diff -- Update: next revision will 1. eliminate `finalOutputPath`as a field 1. call `setOutputPath(finalOutputPath)` in constructor 1. then call `getOutputPath()` and use it in the constructor to set up the destFS, which is always fetched with `getDestFS()` 1. And all uses of `getOutputPath(JobContext)` are gone (in job setup and commit); expect the committer to be set up with the correct output path. This should mean that whatever sets output path will define that output path,; it should be overrideable, provided that the constructor-invoked methods don't cause problems before a subclass is inited. Will that work? > Merge S3A committers into trunk > ------------------------------- > > Key: HADOOP-14971 > URL: https://issues.apache.org/jira/browse/HADOOP-14971 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.0.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > > Merge the HADOOP-13786 committer into trunk. This branch is being set up as a > github PR for review there & to keep it out the mailboxes of the watchers on > the main JIRA -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org