[ https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218548#comment-16218548 ]
ASF GitHub Bot commented on HADOOP-14971: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/282#discussion_r146845180 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Partitioned committer. + * This writes data to specific "partition" subdirectories. + */ +public class PartitionedStagingCommitter extends StagingCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + PartitionedStagingCommitter.class); + public static final String NAME = "PartitionedStagingCommitter"; + + public PartitionedStagingCommitter(Path outputPath, + TaskAttemptContext context) + throws IOException { + super(outputPath, context); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "PartitionedStagingCommitter{"); + sb.append(super.toString()); + sb.append('}'); + return sb.toString(); + } + + @Override + protected int commitTaskInternal(TaskAttemptContext context, + List<FileStatus> taskOutput) throws IOException { + Path attemptPath = getTaskAttemptPath(context); + Set<String> partitions = Paths.getPartitions(attemptPath, taskOutput); + + // enforce conflict resolution, but only if the mode is FAIL. for APPEND, + // it doesn't matter that the partitions are already there, and for REPLACE, + // deletion should be done during task commit. + if (getConflictResolutionMode(context) == ConflictResolution.FAIL) { + FileSystem fs = getDestFS(); + for (String partition : partitions) { + // getFinalPath adds the UUID to the file name. this needs the parent. + Path partitionPath = getFinalPath(partition + "/file", + context).getParent(); + if (fs.exists(partitionPath)) { + throw new PathExistsException(partitionPath.toString()); + } + } + } + return super.commitTaskInternal(context, taskOutput); + } + + /** + * Job-side conflict resolution. + * The partition path conflict resolution assumes that: + * <ol> + * <li>FAIL checking has taken place earlier.</li> + * <li>APPEND is allowed</li> + * <li>REPLACE deletes all existing partitions</li> + * </ol> + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + @Override + protected void preCommitJob(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + + FileSystem fs = getDestFS(); + Set<Path> partitions = Sets.newLinkedHashSet(); + for (SinglePendingCommit commit : pending) { + Path filePath = commit.destinationPath(); + partitions.add(filePath.getParent()); + } + + // enforce conflict resolution + switch (getConflictResolutionMode(context)) { + case FAIL: + // FAIL checking is done on the task side, so this does nothing + break; + case APPEND: + // no check is needed because the output may exist for appending + break; + case REPLACE: + for (Path partitionPath : partitions) { + LOG.info("{}: removing partition path to be replaced: " + + getRole(), partitionPath); + fs.delete(partitionPath, true); --- End diff -- Ryan: what do you think we should for `TABLE_ROOT` part here? nonrecursive list of root entries & delete? > Merge S3A committers into trunk > ------------------------------- > > Key: HADOOP-14971 > URL: https://issues.apache.org/jira/browse/HADOOP-14971 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.0.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > > Merge the HADOOP-13786 committer into trunk. This branch is being set up as a > github PR for review there & to keep it out the mailboxes of the watchers on > the main JIRA -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org