http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java new file mode 100644 index 0000000..b3bcca1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; + +/** + * Dynamically create the output committer based on subclass type and settings. + */ +public abstract class AbstractS3ACommitterFactory + extends PathOutputCommitterFactory { + public static final Logger LOG = LoggerFactory.getLogger( + AbstractS3ACommitterFactory.class); + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + FileSystem fs = getDestinationFileSystem(outputPath, context); + PathOutputCommitter outputCommitter; + if (fs instanceof S3AFileSystem) { + outputCommitter = createTaskCommitter((S3AFileSystem)fs, + outputPath, context); + } else { + throw new PathCommitException(outputPath, + "Filesystem not supported by this committer"); + } + LOG.info("Using Commmitter {} for {}", + outputCommitter, + outputPath); + return outputCommitter; + } + + /** + * Get the destination filesystem, returning null if there is none. + * Code using this must explicitly or implicitly look for a null value + * in the response. + * @param outputPath output path + * @param context job/task context + * @return the destination filesystem, if it can be determined + * @throws IOException if the FS cannot be instantiated + */ + protected FileSystem getDestinationFileSystem(Path outputPath, + JobContext context) + throws IOException { + return outputPath != null ? + FileSystem.get(outputPath.toUri(), context.getConfiguration()) + : null; + } + + /** + * Implementation point: create a task committer for a specific filesystem. + * @param fileSystem destination FS. + * @param outputPath final output path for work + * @param context task context + * @return a committer + * @throws IOException any problem, including the FS not supporting + * the desired committer + */ + public abstract PathOutputCommitter createTaskCommitter( + S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException; +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java new file mode 100644 index 0000000..03cfcba --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN; + +/** + * Constants for working with committers. + */ +@SuppressWarnings("unused") +@InterfaceAudience.Public +@InterfaceStability.Unstable +public final class CommitConstants { + + private CommitConstants() { + } + + /** + * Path for "magic" writes: path and {@link #PENDING_SUFFIX} files: + * {@value}. + */ + public static final String MAGIC = "__magic"; + + /** + * Marker of the start of a directory tree for calculating + * the final path names: {@value}. + */ + public static final String BASE = "__base"; + + /** + * Suffix applied to pending commit metadata: {@value}. + */ + public static final String PENDING_SUFFIX = ".pending"; + + /** + * Suffix applied to multiple pending commit metadata: {@value}. + */ + public static final String PENDINGSET_SUFFIX = ".pendingset"; + + /** + * Flag to indicate whether support for the Magic committer is enabled + * in the filesystem. + * Value: {@value}. + */ + public static final String MAGIC_COMMITTER_PREFIX + = "fs.s3a.committer.magic"; + + /** + * Flag to indicate whether support for the Magic committer is enabled + * in the filesystem. + * Value: {@value}. + */ + public static final String MAGIC_COMMITTER_ENABLED + = MAGIC_COMMITTER_PREFIX + ".enabled"; + + /** + * Flag to indicate whether a stream is a magic output stream; + * returned in {@code StreamCapabilities} + * Value: {@value}. + */ + public static final String STREAM_CAPABILITY_MAGIC_OUTPUT + = "s3a:magic.output.stream"; + + /** + * Flag to indicate that a store supports magic committers. + * returned in {@code StreamCapabilities} + * Value: {@value}. + */ + public static final String STORE_CAPABILITY_MAGIC_COMMITTER + = "s3a:magic.committer"; + + /** + * Is the committer enabled by default? No. + */ + public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = false; + + /** + * This is the "Pending" directory of the {@code FileOutputCommitter}; + * data written here is, in that algorithm, renamed into place. + * Value: {@value}. + */ + public static final String TEMPORARY = "_temporary"; + + /** + * Temp data which is not auto-committed: {@value}. + * Uses a different name from normal just to make clear it is different. + */ + public static final String TEMP_DATA = "__temp-data"; + + + /** + * Flag to trigger creation of a marker file on job completion. + */ + public static final String CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER + = "mapreduce.fileoutputcommitter.marksuccessfuljobs"; + + /** + * Marker file to create on success: {@value}. + */ + public static final String _SUCCESS = "_SUCCESS"; + + /** Default job marker option: {@value}. */ + public static final boolean DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER = true; + + /** + * Key to set for the S3A schema to use the specific committer. + */ + public static final String S3A_COMMITTER_FACTORY_KEY = String.format( + COMMITTER_FACTORY_SCHEME_PATTERN, "s3a"); + + /** + * S3 Committer factory: {@value}. + * This uses the value of {@link #FS_S3A_COMMITTER_NAME} + * to choose the final committer. + */ + public static final String S3A_COMMITTER_FACTORY = + S3ACommitterFactory.CLASSNAME; + + /** + * Option to identify the S3A committer: + * {@value}. + */ + public static final String FS_S3A_COMMITTER_NAME = + "fs.s3a.committer.name"; + + /** + * Option for {@link #FS_S3A_COMMITTER_NAME}: + * classic/file output committer: {@value}. + */ + public static final String COMMITTER_NAME_FILE = "file"; + + /** + * Option for {@link #FS_S3A_COMMITTER_NAME}: + * magic output committer: {@value}. + */ + public static final String COMMITTER_NAME_MAGIC = "magic"; + + /** + * Option for {@link #FS_S3A_COMMITTER_NAME}: + * directory output committer: {@value}. + */ + public static final String COMMITTER_NAME_DIRECTORY = "directory"; + + /** + * Option for {@link #FS_S3A_COMMITTER_NAME}: + * partition output committer: {@value}. + */ + public static final String COMMITTER_NAME_PARTITIONED = "partitioned"; + + /** + * Option for final files to have a uniqueness name through job attempt info, + * falling back to a new UUID if there is no job attempt information to use. + * {@value}. + * When writing data with the "append" conflict option, this guarantees + * that new data will not overwrite any existing data. + */ + public static final String FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES = + "fs.s3a.committer.staging.unique-filenames"; + /** + * Default value for {@link #FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES}: + * {@value}. + */ + public static final boolean DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES = true; + + /** + * Staging committer conflict resolution policy: {@value}. + * Supported: fail, append, replace. + */ + public static final String FS_S3A_COMMITTER_STAGING_CONFLICT_MODE = + "fs.s3a.committer.staging.conflict-mode"; + + /** Conflict mode: {@value}. */ + public static final String CONFLICT_MODE_FAIL = "fail"; + + /** Conflict mode: {@value}. */ + public static final String CONFLICT_MODE_APPEND = "append"; + + /** Conflict mode: {@value}. */ + public static final String CONFLICT_MODE_REPLACE = "replace"; + + /** Default conflict mode: {@value}. */ + public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_FAIL; + + /** + * Number of threads in committers for parallel operations on files + * (upload, commit, abort, delete...): {@value}. + */ + public static final String FS_S3A_COMMITTER_THREADS = + "fs.s3a.committer.threads"; + /** + * Default value for {@link #FS_S3A_COMMITTER_THREADS}: {@value}. + */ + public static final int DEFAULT_COMMITTER_THREADS = 8; + + /** + * Path in the cluster filesystem for temporary data: {@value}. + * This is for HDFS, not the local filesystem. + * It is only for the summary data of each file, not the actual + * data being committed. + */ + public static final String FS_S3A_COMMITTER_STAGING_TMP_PATH = + "fs.s3a.committer.staging.tmp.path"; + + + /** + * Should the staging committers abort all pending uploads to the destination + * directory? Default: true. + * + * Changing this is if more than one partitioned committer is + * writing to the same destination tree simultaneously; otherwise + * the first job to complete will cancel all outstanding uploads from the + * others. However, it may lead to leaked outstanding uploads from failed + * tasks. If disabled, configure the bucket lifecycle to remove uploads + * after a time period, and/or set up a workflow to explicitly delete + * entries. Otherwise there is a risk that uncommitted uploads may run up + * bills. + */ + public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS = + "fs.s3a.committer.staging.abort.pending.uploads"; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java new file mode 100644 index 0000000..9381ef1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; + +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * The implementation of the various actions a committer needs. + * This doesn't implement the protocol/binding to a specific execution engine, + * just the operations needed to to build one. + * + * When invoking FS operations, it assumes that the underlying FS is + * handling retries and exception translation: it does not attempt to + * duplicate that work. + * + */ +public class CommitOperations { + private static final Logger LOG = LoggerFactory.getLogger( + CommitOperations.class); + + /** + * Destination filesystem. + */ + private final S3AFileSystem fs; + + /** Statistics. */ + private final S3AInstrumentation.CommitterStatistics statistics; + + /** + * Write operations for the destination fs. + */ + private final WriteOperationHelper writeOperations; + + /** + * Filter to find all {code .pendingset} files. + */ + public static final PathFilter PENDINGSET_FILTER = + path -> path.toString().endsWith(CommitConstants.PENDINGSET_SUFFIX); + + /** + * Filter to find all {code .pending} files. + */ + public static final PathFilter PENDING_FILTER = + path -> path.toString().endsWith(CommitConstants.PENDING_SUFFIX); + + /** + * Instantiate. + * @param fs FS to bind to + */ + public CommitOperations(S3AFileSystem fs) { + Preconditions.checkArgument(fs != null, "null fs"); + this.fs = fs; + statistics = fs.newCommitterStatistics(); + writeOperations = fs.createWriteOperationHelper(); + } + + /** + * Convert an ordered list of strings to a list of index etag parts. + * @param tagIds list of tags + * @return same list, now in numbered tuples + */ + public static List<PartETag> toPartEtags(List<String> tagIds) { + return IntStream.range(0, tagIds.size()) + .mapToObj(i -> new PartETag(i + 1, tagIds.get(i))) + .collect(Collectors.toList()); + } + + @Override + public String toString() { + return "CommitOperations{" + fs.getUri() + '}'; + } + + /** @return statistics. */ + protected S3AInstrumentation.CommitterStatistics getStatistics() { + return statistics; + } + + /** + * Commit the operation, throwing an exception on any failure. + * @param commit commit to execute + * @throws IOException on a failure + */ + public void commitOrFail(SinglePendingCommit commit) throws IOException { + commit(commit, commit.getFilename()).maybeRethrow(); + } + + /** + * Commit a single pending commit; exceptions are caught + * and converted to an outcome. + * @param commit entry to commit + * @param origin origin path/string for outcome text + * @return the outcome + */ + public MaybeIOE commit(SinglePendingCommit commit, String origin) { + LOG.debug("Committing single commit {}", commit); + MaybeIOE outcome; + String destKey = "unknown destination"; + try { + commit.validate(); + destKey = commit.getDestinationKey(); + long l = innerCommit(commit); + LOG.debug("Successful commit of file length {}", l); + outcome = MaybeIOE.NONE; + statistics.commitCompleted(commit.getLength()); + } catch (IOException e) { + String msg = String.format("Failed to commit upload against %s: %s", + destKey, e); + LOG.warn(msg, e); + outcome = new MaybeIOE(e); + statistics.commitFailed(); + } catch (Exception e) { + String msg = String.format("Failed to commit upload against %s," + + " described in %s: %s", destKey, origin, e); + LOG.warn(msg, e); + outcome = new MaybeIOE(new PathCommitException(origin, msg, e)); + statistics.commitFailed(); + } + return outcome; + } + + /** + * Inner commit operation. + * @param commit entry to commit + * @return bytes committed. + * @throws IOException failure + */ + private long innerCommit(SinglePendingCommit commit) throws IOException { + // finalize the commit + writeOperations.completeMPUwithRetries( + commit.getDestinationKey(), + commit.getUploadId(), + toPartEtags(commit.getEtags()), + commit.getLength(), + new AtomicInteger(0)); + return commit.getLength(); + } + + /** + * Locate all files with the pending suffix under a directory. + * @param pendingDir directory + * @param recursive recursive listing? + * @return the list of all located entries + * @throws IOException if there is a problem listing the path. + */ + public List<LocatedFileStatus> locateAllSinglePendingCommits( + Path pendingDir, + boolean recursive) throws IOException { + return listAndFilter(fs, pendingDir, recursive, PENDING_FILTER); + } + + /** + * Load all single pending commits in the directory. + * All load failures are logged and then added to list of files which would + * not load. + * @param pendingDir directory containing commits + * @param recursive do a recursive scan? + * @return tuple of loaded entries and those pending files which would + * not load/validate. + * @throws IOException on a failure to list the files. + */ + public Pair<PendingSet, + List<Pair<LocatedFileStatus, IOException>>> + loadSinglePendingCommits(Path pendingDir, boolean recursive) + throws IOException { + + List<LocatedFileStatus> statusList = locateAllSinglePendingCommits( + pendingDir, recursive); + PendingSet commits = new PendingSet( + statusList.size()); + List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1); + for (LocatedFileStatus status : statusList) { + try { + commits.add(SinglePendingCommit.load(fs, status.getPath())); + } catch (IOException e) { + LOG.warn("Failed to load commit file {}", status.getPath(), e); + failures.add(Pair.of(status, e)); + } + } + return Pair.of(commits, failures); + } + + /** + * Convert any exception to an IOE, if needed. + * @param key key to use in a path exception + * @param ex exception + * @return an IOE, either the passed in value or a new one wrapping the other + * exception. + */ + public IOException makeIOE(String key, Exception ex) { + return ex instanceof IOException + ? (IOException) ex + : new PathCommitException(key, ex.toString(), ex); + } + + /** + * Abort the multipart commit supplied. This is the lower level operation + * which doesn't generate an outcome, instead raising an exception. + * @param commit pending commit to abort + * @throws FileNotFoundException if the abort ID is unknown + * @throws IOException on any failure + */ + public void abortSingleCommit(SinglePendingCommit commit) + throws IOException { + String destKey = commit.getDestinationKey(); + String origin = commit.getFilename() != null + ? (" defined in " + commit.getFilename()) + : ""; + String uploadId = commit.getUploadId(); + LOG.info("Aborting commit to object {}{}", destKey, origin); + abortMultipartCommit(destKey, uploadId); + } + + /** + * Create an {@code AbortMultipartUpload} request and POST it to S3, + * incrementing statistics afterwards. + * @param destKey destination key + * @param uploadId upload to cancel + * @throws FileNotFoundException if the abort ID is unknown + * @throws IOException on any failure + */ + public void abortMultipartCommit(String destKey, String uploadId) + throws IOException { + try { + writeOperations.abortMultipartCommit(destKey, uploadId); + } finally { + statistics.commitAborted(); + } + } + + /** + * Enumerate all pending files in a dir/tree, abort. + * @param pendingDir directory of pending operations + * @param recursive recurse? + * @return the outcome of all the abort operations + * @throws IOException if there is a problem listing the path. + */ + public MaybeIOE abortAllSinglePendingCommits(Path pendingDir, + boolean recursive) + throws IOException { + Preconditions.checkArgument(pendingDir != null, "null pendingDir"); + LOG.debug("Aborting all pending commit filess under {}" + + " (recursive={}", pendingDir, recursive); + RemoteIterator<LocatedFileStatus> pendingFiles; + try { + pendingFiles = ls(pendingDir, recursive); + } catch (FileNotFoundException fnfe) { + LOG.info("No directory to abort {}", pendingDir); + return MaybeIOE.NONE; + } + MaybeIOE outcome = MaybeIOE.NONE; + if (!pendingFiles.hasNext()) { + LOG.debug("No files to abort under {}", pendingDir); + } + while (pendingFiles.hasNext()) { + Path pendingFile = pendingFiles.next().getPath(); + if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) { + try { + abortSingleCommit(SinglePendingCommit.load(fs, pendingFile)); + } catch (FileNotFoundException e) { + LOG.debug("listed file already deleted: {}", pendingFile); + } catch (IOException | IllegalArgumentException e) { + if (outcome == null) { + outcome = new MaybeIOE(makeIOE(pendingFile.toString(), e)); + } + } finally { + // quietly try to delete the pending file + S3AUtils.deleteQuietly(fs, pendingFile, false); + } + } + } + return outcome; + } + + /** + * List files. + * @param path path + * @param recursive recursive listing? + * @return iterator + * @throws IOException failure + */ + protected RemoteIterator<LocatedFileStatus> ls(Path path, boolean recursive) + throws IOException { + return fs.listFiles(path, recursive); + } + + /** + * List all pending uploads to the destination FS under a path. + * @param dest destination path + * @return A list of the pending uploads to any directory under that path. + * @throws IOException IO failure + */ + public List<MultipartUpload> listPendingUploadsUnderPath(Path dest) + throws IOException { + return fs.listMultipartUploads(fs.pathToKey(dest)); + } + + /** + * Abort all pending uploads to the destination FS under a path. + * @param dest destination path + * @return a count of the number of uploads aborted. + * @throws IOException IO failure + */ + public int abortPendingUploadsUnderPath(Path dest) throws IOException { + return writeOperations.abortMultipartUploadsUnderPath(fs.pathToKey(dest)); + } + + /** + * Delete any existing {@code _SUCCESS} file. + * @param outputPath output directory + * @throws IOException IO problem + */ + public void deleteSuccessMarker(Path outputPath) throws IOException { + fs.delete(new Path(outputPath, _SUCCESS), false); + } + + /** + * Save the success data to the {@code _SUCCESS} file. + * @param outputPath output directory + * @param successData success data to save. + * @param addMetrics should the FS metrics be added? + * @throws IOException IO problem + */ + public void createSuccessMarker(Path outputPath, + SuccessData successData, + boolean addMetrics) + throws IOException { + Preconditions.checkArgument(outputPath != null, "null outputPath"); + + if (addMetrics) { + addFileSystemStatistics(successData.getMetrics()); + } + // add any diagnostics + Configuration conf = fs.getConf(); + successData.addDiagnostic(S3_METADATA_STORE_IMPL, + conf.getTrimmed(S3_METADATA_STORE_IMPL, "")); + successData.addDiagnostic(METADATASTORE_AUTHORITATIVE, + conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false")); + successData.addDiagnostic(MAGIC_COMMITTER_ENABLED, + conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false")); + + // now write + Path markerPath = new Path(outputPath, _SUCCESS); + LOG.debug("Touching success marker for job {}: {}", markerPath, + successData); + successData.save(fs, markerPath, true); + } + + /** + * Revert a pending commit by deleting the destination. + * @param commit pending commit + * @throws IOException failure + */ + public void revertCommit(SinglePendingCommit commit) throws IOException { + LOG.warn("Revert {}", commit); + try { + writeOperations.revertCommit(commit.getDestinationKey()); + } finally { + statistics.commitReverted(); + } + } + + /** + * Upload all the data in the local file, returning the information + * needed to commit the work. + * @param localFile local file (be a file) + * @param destPath destination path + * @param partition partition/subdir. Not used + * @param uploadPartSize size of upload + * @return a pending upload entry + * @throws IOException failure + */ + public SinglePendingCommit uploadFileToPendingCommit(File localFile, + Path destPath, + String partition, + long uploadPartSize) + throws IOException { + + LOG.debug("Initiating multipart upload from {} to {}", + localFile, destPath); + Preconditions.checkArgument(destPath != null); + if (!localFile.isFile()) { + throw new FileNotFoundException("Not a file: " + localFile); + } + String destURI = destPath.toString(); + String destKey = fs.pathToKey(destPath); + String uploadId = null; + + boolean threw = true; + try { + statistics.commitCreated(); + uploadId = writeOperations.initiateMultiPartUpload(destKey); + long length = localFile.length(); + + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.setDestinationKey(destKey); + commitData.setBucket(fs.getBucket()); + commitData.touch(System.currentTimeMillis()); + commitData.setUploadId(uploadId); + commitData.setUri(destURI); + commitData.setText(partition != null ? "partition: " + partition : ""); + commitData.setLength(length); + + long offset = 0; + long numParts = (length / uploadPartSize + + ((length % uploadPartSize) > 0 ? 1 : 0)); + // always write one part, even if it is just an empty one + if (numParts == 0) { + numParts = 1; + } + + List<PartETag> parts = new ArrayList<>((int) numParts); + + LOG.debug("File size is {}, number of parts to upload = {}", + length, numParts); + for (int partNumber = 1; partNumber <= numParts; partNumber += 1) { + long size = Math.min(length - offset, uploadPartSize); + UploadPartRequest part; + part = writeOperations.newUploadPartRequest( + destKey, + uploadId, + partNumber, + (int) size, + null, + localFile, + offset); + part.setLastPart(partNumber == numParts); + UploadPartResult partResult = writeOperations.uploadPart(part); + offset += uploadPartSize; + parts.add(partResult.getPartETag()); + } + + commitData.bindCommitData(parts); + statistics.commitUploaded(length); + threw = false; + return commitData; + } finally { + if (threw && uploadId != null) { + statistics.commitAborted(); + try { + abortMultipartCommit(destKey, uploadId); + } catch (IOException e) { + LOG.error("Failed to abort upload {} to {}", uploadId, destKey, e); + } + } + } + } + + /** + * Add the filesystem statistics to the map; overwriting anything + * with the same name. + * @param dest destination map + */ + public void addFileSystemStatistics(Map<String, Long> dest) { + dest.putAll(fs.getInstrumentation().toMap()); + } + + /** + * Note that a task has completed. + * @param success success flag + */ + public void taskCompleted(boolean success) { + statistics.taskCompleted(success); + } + + /** + * Note that a job has completed. + * @param success success flag + */ + public void jobCompleted(boolean success) { + statistics.jobCompleted(success); + } + + /** + * A holder for a possible IOException; the call {@link #maybeRethrow()} + * will throw any exception passed into the constructor, and be a no-op + * if none was. + * + * Why isn't a Java 8 optional used here? The main benefit would be that + * {@link #maybeRethrow()} could be done as a map(), but because Java doesn't + * allow checked exceptions in a map, the following code is invalid + * <pre> + * exception.map((e) -> {throw e;} + * </pre> + * As a result, the code to work with exceptions would be almost as convoluted + * as the original. + */ + public static class MaybeIOE { + private final IOException exception; + + public static final MaybeIOE NONE = new MaybeIOE(null); + + /** + * Construct with an exception. + * @param exception exception + */ + public MaybeIOE(IOException exception) { + this.exception = exception; + } + + /** + * Get any exception. + * @return the exception. + */ + public IOException getException() { + return exception; + } + + /** + * Is there an exception in this class? + * @return true if there is an exception + */ + public boolean hasException() { + return exception != null; + } + + /** + * Rethrow any exception. + * @throws IOException the exception field, if non-null. + */ + public void maybeRethrow() throws IOException { + if (exception != null) { + throw exception; + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("MaybeIOE{"); + sb.append(hasException() ? exception : ""); + sb.append('}'); + return sb.toString(); + } + + /** + * Get an instance based on the exception: either a value + * or a reference to {@link #NONE}. + * @param ex exception + * @return an instance. + */ + public static MaybeIOE of(IOException ex) { + return ex != null ? new MaybeIOE(ex) : NONE; + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java new file mode 100644 index 0000000..9c684c7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify; + +/** + * Static utility methods related to S3A commitment processing, both + * staging and magic. + * + * <b>Do not use in any codepath intended to be used from the S3AFS + * except in the committers themselves.</b> + */ +public final class CommitUtils { + private static final Logger LOG = + LoggerFactory.getLogger(CommitUtils.class); + + private CommitUtils() { + } + + /** + * Verify that the path is a magic one. + * @param fs filesystem + * @param path path + * @throws PathCommitException if the path isn't a magic commit path + */ + public static void verifyIsMagicCommitPath(S3AFileSystem fs, + Path path) throws PathCommitException { + verifyIsMagicCommitFS(fs); + if (!fs.isMagicCommitPath(path)) { + throw new PathCommitException(path, E_BAD_PATH); + } + } + + /** + * Verify that an S3A FS instance is a magic commit FS. + * @param fs filesystem + * @throws PathCommitException if the FS isn't a magic commit FS. + */ + public static void verifyIsMagicCommitFS(S3AFileSystem fs) + throws PathCommitException { + if (!fs.isMagicCommitEnabled()) { + // dump out details to console for support diagnostics + String fsUri = fs.getUri().toString(); + LOG.error("{}: {}:\n{}", E_NORMAL_FS, fsUri, fs); + // then fail + throw new PathCommitException(fsUri, E_NORMAL_FS); + } + } + + /** + * Verify that an FS is an S3A FS. + * @param fs filesystem + * @param path path to to use in exception + * @return the typecast FS. + * @throws PathCommitException if the FS is not an S3A FS. + */ + public static S3AFileSystem verifyIsS3AFS(FileSystem fs, Path path) + throws PathCommitException { + if (!(fs instanceof S3AFileSystem)) { + throw new PathCommitException(path, E_WRONG_FS); + } + return (S3AFileSystem) fs; + } + + /** + * Get the S3A FS of a path. + * @param path path to examine + * @param conf config + * @param magicCommitRequired is magic complete required in the FS? + * @return the filesystem + * @throws PathCommitException output path isn't to an S3A FS instance, or + * if {@code magicCommitRequired} is set, if doesn't support these commits. + * @throws IOException failure to instantiate the FS + */ + public static S3AFileSystem getS3AFileSystem(Path path, + Configuration conf, + boolean magicCommitRequired) + throws PathCommitException, IOException { + S3AFileSystem s3AFS = verifyIsS3AFS(path.getFileSystem(conf), path); + if (magicCommitRequired) { + verifyIsMagicCommitFS(s3AFS); + } + return s3AFS; + } + + /** + * Verify that all instances in a collection are of the given class. + * @param it iterator + * @param classname classname to require + * @throws ValidationFailure on a failure + */ + public static void validateCollectionClass(Iterable it, Class classname) + throws ValidationFailure { + for (Object o : it) { + verify(o.getClass().equals(classname), + "Collection element is not a %s: %s", classname, o.getClass()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java new file mode 100644 index 0000000..c6c0da8 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; + +/** + * These are commit utility methods which import classes from + * hadoop-mapreduce, and so only work when that module is on the + * classpath. + * + * <b>Do not use in any codepath intended to be used from the S3AFS + * except in the committers themselves.</b> + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class CommitUtilsWithMR { + + private CommitUtilsWithMR() { + } + + /** + * Get the location of magic job attempts. + * @param out the base output directory. + * @return the location of magic job attempts. + */ + public static Path getMagicJobAttemptsPath(Path out) { + return new Path(out, MAGIC); + } + + /** + * Get the Application Attempt ID for this job. + * @param context the context to look in + * @return the Application Attempt ID for a given job. + */ + public static int getAppAttemptId(JobContext context) { + return context.getConfiguration().getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + } + + /** + * Compute the "magic" path for a job attempt. + * @param appAttemptId the ID of the application attempt for this job. + * @param dest the final output directory + * @return the path to store job attempt data. + */ + public static Path getMagicJobAttemptPath(int appAttemptId, Path dest) { + return new Path(getMagicJobAttemptsPath(dest), + formatAppAttemptDir(appAttemptId)); + } + + /** + * Format the application attempt directory. + * @param attemptId attempt ID + * @return the directory name for the application attempt + */ + public static String formatAppAttemptDir(int attemptId) { + return String.format("app-attempt-%04d", attemptId); + } + + /** + * Compute the path where the output of magic task attempts are stored. + * @param context the context of the job with magic tasks. + * @param dest destination of work + * @return the path where the output of magic task attempts are stored. + */ + public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) { + return new Path(getMagicJobAttemptPath( + getAppAttemptId(context), dest), "tasks"); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * This path is marked as a base path for relocations, so subdirectory + * information is preserved. + * @param context the context of the task attempt. + * @param dest The output path to commit work into + * @return the path where a task attempt should be stored. + */ + public static Path getMagicTaskAttemptPath(TaskAttemptContext context, + Path dest) { + return new Path(getBaseMagicTaskAttemptPath(context, dest), BASE); + } + + /** + * Get the base Magic attempt path, without any annotations to mark relative + * references. + * @param context task context. + * @param dest The output path to commit work into + * @return the path under which all attempts go + */ + public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context, + Path dest) { + return new Path(getMagicTaskAttemptsPath(context, dest), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Compute a path for temporary data associated with a job. + * This data is <i>not magic</i> + * @param appAttemptId the ID of the application attempt for this job. + * @param out output directory of job + * @return the path to store temporary job attempt data. + */ + public static Path getTempJobAttemptPath(int appAttemptId, Path out) { + return new Path(new Path(out, TEMP_DATA), + formatAppAttemptDir(appAttemptId)); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context task context + * @param out output directory of job + * @return the path to store temporary job attempt data. + */ + public static Path getTempTaskAttemptPath(TaskAttemptContext context, + Path out) { + return new Path(getTempJobAttemptPath(getAppAttemptId(context), out), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Get a string value of a job ID; returns meaningful text if there is no ID. + * @param context job context + * @return a string for logs + */ + public static String jobIdString(JobContext context) { + JobID jobID = context.getJobID(); + return jobID != null ? jobID.toString() : "(no job ID)"; + } + + /** + * Get a job name; returns meaningful text if there is no name. + * @param context job context + * @return a string for logs + */ + public static String jobName(JobContext context) { + String name = context.getJobName(); + return (name != null && !name.isEmpty()) ? name : "(anonymous)"; + } + + /** + * Get a configuration option, with any value in the job configuration + * taking priority over that in the filesystem. + * This allows for per-job override of FS parameters. + * + * Order is: job context, filesystem config, default value + * + * @param context job/task context + * @param fsConf filesystem configuration. Get this from the FS to guarantee + * per-bucket parameter propagation + * @param key key to look for + * @param defVal default value + * @return the configuration option. + */ + public static String getConfigurationOption( + JobContext context, + Configuration fsConf, + String key, + String defVal) { + return context.getConfiguration().getTrimmed(key, + fsConf.getTrimmed(key, defVal)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java new file mode 100644 index 0000000..c44a90b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +/** + * Little duration counter. + */ +public class Duration { + + private final long started; + private long finished; + + public Duration() { + started = time(); + finished = started; + } + + protected long time() { + return System.currentTimeMillis(); + } + + public void finished() { + finished = time(); + } + + public String getDurationString() { + return humanTime(value()); + } + + public static String humanTime(long time) { + long seconds = (time / 1000); + long minutes = (seconds / 60); + return String.format("%d:%02d.%03ds", minutes, seconds % 60, time % 1000); + } + + @Override + public String toString() { + return getDurationString(); + } + + public long value() { + return finished -started; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java new file mode 100644 index 0000000..c6617f8 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.slf4j.Logger; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A duration with logging of final state at info in the {@code close()} call. + * This allows it to be used in a try-with-resources clause, and have the + * duration automatically logged. + */ +@InterfaceAudience.Private +public class DurationInfo extends Duration + implements AutoCloseable { + private final String text; + + private final Logger log; + + /** + * Create the duration text from a {@code String.format()} code call. + * @param log log to write to + * @param format format string + * @param args list of arguments + */ + public DurationInfo(Logger log, String format, Object... args) { + this.text = String.format(format, args); + this.log = log; + log.info("Starting: {}", text); + } + + @Override + public String toString() { + return text + ": duration " + super.toString(); + } + + @Override + public void close() { + finished(); + log.info(this.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java new file mode 100644 index 0000000..2821fce --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; + +/** + * These are internal constants not intended for public use. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class InternalCommitterConstants { + + private InternalCommitterConstants() { + } + + /** + * This is the staging committer base class; only used for testing. + */ + public static final String COMMITTER_NAME_STAGING = "staging"; + + /** + * A unique identifier to use for this work: {@value}. + */ + public static final String FS_S3A_COMMITTER_STAGING_UUID = + "fs.s3a.committer.staging.uuid"; + + /** + * Directory committer factory: {@value}. + */ + public static final String STAGING_COMMITTER_FACTORY = + StagingCommitterFactory.CLASSNAME; + + /** + * Directory committer factory: {@value}. + */ + public static final String DIRECTORY_COMMITTER_FACTORY = + DirectoryStagingCommitterFactory.CLASSNAME; + + /** + * Partitioned committer factory: {@value}. + */ + public static final String PARTITION_COMMITTER_FACTORY = + PartitionedStagingCommitterFactory.CLASSNAME; + + /** + * Magic committer factory: {@value}. + */ + public static final String MAGIC_COMMITTER_FACTORY = + MagicS3GuardCommitterFactory.CLASSNAME; + + /** + * Error text when the destination path exists and the committer + * must abort the job/task {@value}. + */ + public static final String E_DEST_EXISTS = + "Destination path exists and committer conflict resolution mode is " + + "\"fail\""; + + /** Error message for bad path: {@value}. */ + public static final String E_BAD_PATH + = "Path does not represent a magic-commit path"; + + /** Error message if filesystem isn't magic: {@value}. */ + public static final String E_NORMAL_FS + = "Filesystem does not have support for 'magic' committer enabled" + + " in configuration option " + MAGIC_COMMITTER_ENABLED; + + /** Error message if the dest FS isn't S3A: {@value}. */ + public static final String E_WRONG_FS + = "Output path is not on an S3A Filesystem"; + + /** Error message for a path without a magic element in the list: {@value}. */ + public static final String E_NO_MAGIC_PATH_ELEMENT + = "No " + MAGIC + " element in path"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java new file mode 100644 index 0000000..849a06d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; + +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; + +/** + * A class which manages access to a temporary directory store, uses the + * directories listed in {@link Constants#BUFFER_DIR} for this. + */ +final class LocalTempDir { + + private LocalTempDir() { + } + + private static LocalDirAllocator directoryAllocator; + + private static synchronized LocalDirAllocator getAllocator( + Configuration conf, String key) { + if (directoryAllocator != null) { + String bufferDir = conf.get(key) != null + ? key : Constants.HADOOP_TMP_DIR; + directoryAllocator = new LocalDirAllocator(bufferDir); + } + return directoryAllocator; + } + + /** + * Create a temp file. + * @param conf configuration to use when creating the allocator + * @param prefix filename prefix + * @param size file size, or -1 if not known + * @return the temp file. The file has been created. + * @throws IOException IO failure + */ + public static File tempFile(Configuration conf, String prefix, long size) + throws IOException { + return getAllocator(conf, BUFFER_DIR).createTmpFileForWrite( + prefix, size, conf); + } + + /** + * Get a temporary path. + * @param conf configuration to use when creating the allocator + * @param prefix filename prefix + * @param size file size, or -1 if not known + * @return the temp path. + * @throws IOException IO failure + */ + public static Path tempPath(Configuration conf, String prefix, long size) + throws IOException { + return getAllocator(conf, BUFFER_DIR) + .getLocalPathForWrite(prefix, size, conf); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java new file mode 100644 index 0000000..a07b5c9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; + +import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; + +/** + * Adds the code needed for S3A to support magic committers. + * It's pulled out to keep S3A FS class slightly less complex. + * This class can be instantiated even when magic commit is disabled; + * in this case: + * <ol> + * <li>{@link #isMagicCommitPath(Path)} will always return false.</li> + * <li>{@link #createTracker(Path, String)} will always return an instance + * of {@link PutTracker}.</li> + * </ol> + * + * <p>Important</p>: must not directly or indirectly import a class which + * uses any datatype in hadoop-mapreduce. + */ +public class MagicCommitIntegration { + private static final Logger LOG = + LoggerFactory.getLogger(MagicCommitIntegration.class); + private final S3AFileSystem owner; + private final boolean magicCommitEnabled; + + /** + * Instantiate. + * @param owner owner class + * @param magicCommitEnabled is magic commit enabled. + */ + public MagicCommitIntegration(S3AFileSystem owner, + boolean magicCommitEnabled) { + this.owner = owner; + this.magicCommitEnabled = magicCommitEnabled; + } + + /** + * Given an (elements, key) pair, return the key of the final destination of + * the PUT, that is: where the final path is expected to go? + * @param elements path split to elements + * @param key key + * @return key for final put. If this is not a magic commit, the + * same as the key in. + */ + public String keyOfFinalDestination(List<String> elements, String key) { + if (isMagicCommitPath(elements)) { + return elementsToKey(finalDestination(elements)); + } else { + return key; + } + } + + /** + * Given a path and a key to that same path, create a tracker for it. + * This specific tracker will be chosen based on whether or not + * the path is a magic one. + * @param path path of nominal write + * @param key key of path of nominal write + * @return the tracker for this operation. + */ + public PutTracker createTracker(Path path, String key) { + final List<String> elements = splitPathToElements(path); + PutTracker tracker; + + if(isMagicFile(elements)) { + // path is of a magic file + if (isMagicCommitPath(elements)) { + final String destKey = keyOfFinalDestination(elements, key); + String pendingsetPath = key + CommitConstants.PENDING_SUFFIX; + owner.getInstrumentation() + .incrementCounter(Statistic.COMMITTER_MAGIC_FILES_CREATED, 1); + tracker = new MagicCommitTracker(path, + owner.getBucket(), + key, + destKey, + pendingsetPath, + owner.createWriteOperationHelper()); + LOG.debug("Created {}", tracker); + } else { + LOG.warn("File being created has a \"magic\" path, but the filesystem" + + " has magic file support disabled: {}", path); + // downgrade to standard multipart tracking + tracker = new PutTracker(key); + } + } else { + // standard multipart tracking + tracker = new PutTracker(key); + } + return tracker; + } + + /** + * This performs the calculation of the final destination of a set + * of elements. + * + * @param elements original (do not edit after this call) + * @return a list of elements, possibly empty + */ + private List<String> finalDestination(List<String> elements) { + return magicCommitEnabled ? + MagicCommitPaths.finalDestination(elements) + : elements; + } + + /** + * Is magic commit enabled? + * @return true if magic commit is turned on. + */ + public boolean isMagicCommitEnabled() { + return magicCommitEnabled; + } + + /** + * Predicate: is a path a magic commit path? + * @param path path to examine + * @return true if the path is or is under a magic directory + */ + public boolean isMagicCommitPath(Path path) { + return isMagicCommitPath(splitPathToElements(path)); + } + + /** + * Is this path a magic commit path in this filesystem? + * True if magic commit is enabled, the path is magic + * and the path is not actually a commit metadata file. + * @param elements element list + * @return true if writing path is to be uprated to a magic file write + */ + private boolean isMagicCommitPath(List<String> elements) { + return magicCommitEnabled && isMagicFile(elements); + } + + /** + * Is the file a magic file: this predicate doesn't check + * for the FS actually having the magic bit being set. + * @param elements path elements + * @return true if the path is one a magic file write expects. + */ + private boolean isMagicFile(List<String> elements) { + return isMagicPath(elements) && + !isCommitMetadataFile(elements); + } + + /** + * Does this file contain all the commit metadata? + * @param elements path element list + * @return true if this file is one of the commit metadata files. + */ + private boolean isCommitMetadataFile(List<String> elements) { + String last = elements.get(elements.size() - 1); + return last.endsWith(CommitConstants.PENDING_SUFFIX) + || last.endsWith(CommitConstants.PENDINGSET_SUFFIX); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java new file mode 100644 index 0000000..745b5b2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT; + +/** + * Operations on (magic) paths. + */ +public final class MagicCommitPaths { + + private MagicCommitPaths() { + } + + /** + * Take an absolute path, split it into a list of elements. + * If empty, the path is the root path. + * @param path input path + * @return a possibly empty list of elements. + * @throws IllegalArgumentException if the path is invalid -relative, empty... + */ + public static List<String> splitPathToElements(Path path) { + checkArgument(path.isAbsolute(), "path is relative"); + String uriPath = path.toUri().getPath(); + checkArgument(!uriPath.isEmpty(), "empty path"); + if ("/".equals(uriPath)) { + // special case: empty list + return new ArrayList<>(0); + } + List<String> elements = new ArrayList<>(); + int len = uriPath.length(); + int firstElementChar = 1; + int endOfElement = uriPath.indexOf('/', firstElementChar); + while (endOfElement > 0) { + elements.add(uriPath.substring(firstElementChar, endOfElement)); + firstElementChar = endOfElement + 1; + endOfElement = firstElementChar == len ? -1 + : uriPath.indexOf('/', firstElementChar); + } + // expect a possible child element here + if (firstElementChar != len) { + elements.add(uriPath.substring(firstElementChar)); + } + return elements; + } + + /** + * Is a path in the magic tree? + * @param elements element list + * @return true if a path is considered magic + */ + public static boolean isMagicPath(List<String> elements) { + return elements.contains(MAGIC); + } + + /** + * Does the list of magic elements contain a base path marker? + * @param elements element list, already stripped out + * from the magic tree. + * @return true if a path has a base directory + */ + public static boolean containsBasePath(List<String> elements) { + return elements.contains(BASE); + } + + /** + * Get the index of the magic path element. + * @param elements full path element list + * @return the index. + * @throws IllegalArgumentException if there is no magic element + */ + public static int magicElementIndex(List<String> elements) { + int index = elements.indexOf(MAGIC); + checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT); + return index; + } + + /** + * Get the parent path elements of the magic path. + * The list may be immutable or may be a view of the underlying list. + * Both the parameter list and the returned list MUST NOT be modified. + * @param elements full path element list + * @return the parent elements; may be empty + */ + public static List<String> magicPathParents(List<String> elements) { + return elements.subList(0, magicElementIndex(elements)); + } + + /** + * Get the child path elements under the magic path. + * The list may be immutable or may be a view of the underlying list. + * Both the parameter list and the returned list MUST NOT be modified. + * @param elements full path element list + * @return the child elements; may be empty + */ + public static List<String> magicPathChildren(List<String> elements) { + int index = magicElementIndex(elements); + int len = elements.size(); + if (index == len - 1) { + // empty index + return Collections.emptyList(); + } else { + return elements.subList(index + 1, len); + } + } + + /** + * Get any child path elements under any {@code __base} path, + * or an empty list if there is either: no {@code __base} path element, + * or no child entries under it. + * The list may be immutable or may be a view of the underlying list. + * Both the parameter list and the returned list MUST NOT be modified. + * @param elements full path element list + * @return the child elements; may be empty + */ + public static List<String> basePathChildren(List<String> elements) { + int index = elements.indexOf(BASE); + if (index < 0) { + return Collections.emptyList(); + } + int len = elements.size(); + if (index == len - 1) { + // empty index + return Collections.emptyList(); + } else { + return elements.subList(index + 1, len); + } + } + + /** + * Take a list of elements and create an S3 key by joining them + * with "/" between each one. + * @param elements path elements + * @return a path which can be used in the AWS API + */ + public static String elementsToKey(List<String> elements) { + return StringUtils.join("/", elements); + } + + /** + * Get the filename of a path: the last element. + * @param elements element list. + * @return the filename; the last element. + */ + public static String filename(List<String> elements) { + return lastElement(elements); + } + + /** + * Last element of a (non-empty) list. + * @param strings strings in + * @return the last one. + */ + public static String lastElement(List<String> strings) { + checkArgument(!strings.isEmpty(), "empty list"); + return strings.get(strings.size() - 1); + } + + /** + * Get the magic subdirectory of a destination directory. + * @param destDir the destination directory + * @return a new path. + */ + public static Path magicSubdir(Path destDir) { + return new Path(destDir, MAGIC); + } + + /** + * Calculates the final destination of a file. + * This is the parent of any {@code __magic} element, and the filename + * of the path. That is: all intermediate child path elements are discarded. + * Why so? paths under the magic path include job attempt and task attempt + * subdirectories, which need to be skipped. + * + * If there is a {@code __base} directory in the children, then it becomes + * a base for unflattened paths, that is: all its children are pulled into + * the final destination. + * @param elements element list. + * @return the path + */ + public static List<String> finalDestination(List<String> elements) { + if (isMagicPath(elements)) { + List<String> destDir = magicPathParents(elements); + List<String> children = magicPathChildren(elements); + checkArgument(!children.isEmpty(), "No path found under " + + MAGIC); + ArrayList<String> dest = new ArrayList<>(destDir); + if (containsBasePath(children)) { + // there's a base marker in the path + List<String> baseChildren = basePathChildren(children); + checkArgument(!baseChildren.isEmpty(), + "No path found under " + BASE); + dest.addAll(baseChildren); + } else { + dest.add(filename(children)); + } + return dest; + } else { + return elements; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java new file mode 100644 index 0000000..4607548 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; + +/** + * Path exception to use for various commit issues. + */ +public class PathCommitException extends PathIOException { + public PathCommitException(String path, Throwable cause) { + super(path, cause); + } + + public PathCommitException(String path, String error) { + super(path, error); + } + + public PathCommitException(Path path, String error) { + super(path != null ? path.toString() : "", error); + } + + public PathCommitException(String path, String error, Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java new file mode 100644 index 0000000..bbffef3 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.s3.model.PartETag; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Multipart put tracker. + * Base class does nothing except declare that any + * MPU must complete in the {@code close()} operation. + * + */ +@InterfaceAudience.Private +public class PutTracker { + + /** The destination. */ + private final String destKey; + + /** + * Instantiate. + * @param destKey destination key + */ + public PutTracker(String destKey) { + this.destKey = destKey; + } + + /** + * Startup event. + * @return true if the multipart should start immediately. + * @throws IOException any IO problem. + */ + public boolean initialize() throws IOException { + return false; + } + + /** + * Flag to indicate that output is not immediately visible after the stream + * is closed. Default: false. + * @return true if the output's visibility will be delayed. + */ + public boolean outputImmediatelyVisible() { + return true; + } + + /** + * Callback when the upload is is about to complete. + * @param uploadId Upload ID + * @param parts list of parts + * @param bytesWritten bytes written + * @return true if the commit is to be initiated immediately. + * False implies the output stream does not need to worry about + * what happens. + * @throws IOException I/O problem or validation failure. + */ + public boolean aboutToComplete(String uploadId, + List<PartETag> parts, + long bytesWritten) + throws IOException { + return true; + } + + /** + * get the destination key. The default implementation returns the + * key passed in: there is no adjustment of the destination. + * @return the destination to use in PUT requests. + */ + public String getDestKey() { + return destKey; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "DefaultPutTracker{"); + sb.append("destKey='").append(destKey).append('\''); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java new file mode 100644 index 0000000..6b170f9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * The S3A committer factory which chooses the committer based on the + * specific option chosen in a per-bucket basis from the property + * {@link CommitConstants#FS_S3A_COMMITTER_NAME}. + * + * This should be instantiated by using the property value {@link #CLASSNAME} + * as the committer for the job, then set the filesystem property + * {@link CommitConstants#FS_S3A_COMMITTER_NAME} to one of + * <ul> + * <li>{@link CommitConstants#COMMITTER_NAME_FILE}: File committer.</li> + * <li>{@link CommitConstants#COMMITTER_NAME_DIRECTORY}: + * Staging directory committer.</li> + * <li>{@link CommitConstants#COMMITTER_NAME_PARTITIONED}: + * Staging partitioned committer.</li> + * <li>{@link CommitConstants#COMMITTER_NAME_MAGIC}: + * the "Magic" committer</li> + * <li>{@link InternalCommitterConstants#COMMITTER_NAME_STAGING}: + * the "staging" committer, which isn't intended for use outside tests.</li> + * </ul> + * There are no checks to verify that the filesystem is compatible with + * the committer. + */ +public class S3ACommitterFactory extends AbstractS3ACommitterFactory { + + /** + * Name of this class: {@value}. + */ + public static final String CLASSNAME + = "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"; + + /** + * Create a task committer. + * @param fileSystem destination FS. + * @param outputPath final output path for work + * @param context job context + * @return a committer + * @throws IOException instantiation failure + */ + @Override + public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException { + AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem, + outputPath, + context.getConfiguration()); + return factory != null ? + factory.createTaskCommitter(fileSystem, outputPath, context) + : createFileOutputCommitter(outputPath, context); + } + + /** + * Choose a committer from the FS and task configurations. Task Configuration + * takes priority, allowing execution engines to dynamically change + * committer on a query-by-query basis. + * @param fileSystem FS + * @param outputPath destination path + * @param taskConf configuration from the task + * @return An S3A committer if chosen, or "null" for the classic value + * @throws PathCommitException on a failure to identify the committer + */ + private AbstractS3ACommitterFactory chooseCommitterFactory( + S3AFileSystem fileSystem, + Path outputPath, + Configuration taskConf) throws PathCommitException { + AbstractS3ACommitterFactory factory; + + // the FS conf will have had its per-bucket values resolved, unlike + // job/task configurations. + Configuration fsConf = fileSystem.getConf(); + + String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); + name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name); + switch (name) { + case COMMITTER_NAME_FILE: + factory = null; + break; + case COMMITTER_NAME_DIRECTORY: + factory = new DirectoryStagingCommitterFactory(); + break; + case COMMITTER_NAME_PARTITIONED: + factory = new PartitionedStagingCommitterFactory(); + break; + case COMMITTER_NAME_MAGIC: + factory = new MagicS3GuardCommitterFactory(); + break; + case InternalCommitterConstants.COMMITTER_NAME_STAGING: + factory = new StagingCommitterFactory(); + break; + default: + throw new PathCommitException(outputPath, + "Unknown committer: \"" + name + "\""); + } + return factory; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org