http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java new file mode 100644 index 0000000..b446f22 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +/** + * Factory for the {@link PartitionedStagingCommitter}. + */ +public class PartitionedStagingCommitterFactory + extends AbstractS3ACommitterFactory { + + /** + * Name of this class: {@value}. + */ + public static final String CLASSNAME + = "org.apache.hadoop.fs.s3a.commit.staging" + + ".PartitionedStagingCommitterFactory"; + + public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException { + return new PartitionedStagingCommitter(outputPath, context); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java new file mode 100644 index 0000000..a4d39d7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsDirectoryException; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; + +/** + * Path operations for the staging committers. + */ +public final class Paths { + + private Paths() { + } + + /** + * Insert the UUID to a path if it is not there already. + * If there is a trailing "." in the prefix after the last slash, the + * UUID is inserted before it with a "-" prefix; otherwise appended. + * + * Examples: + * <pre> + * /example/part-0000 ==> /example/part-0000-0ab34 + * /example/part-0001.gz.csv ==> /example/part-0001-0ab34.gz.csv + * /example/part-0002-0abc3.gz.csv ==> /example/part-0002-0abc3.gz.csv + * /example0abc3/part-0002.gz.csv ==> /example0abc3/part-0002.gz.csv + * </pre> + * + * + * @param pathStr path as a string; must not have a trailing "/". + * @param uuid UUID to append; must not be empty + * @return new path. + */ + public static String addUUID(String pathStr, String uuid) { + Preconditions.checkArgument(StringUtils.isNotEmpty(pathStr), "empty path"); + Preconditions.checkArgument(StringUtils.isNotEmpty(uuid), "empty uuid"); + // In some cases, Spark will add the UUID to the filename itself. + if (pathStr.contains(uuid)) { + return pathStr; + } + + int dot; // location of the first '.' in the file name + int lastSlash = pathStr.lastIndexOf('/'); + if (lastSlash >= 0) { + Preconditions.checkState(lastSlash + 1 < pathStr.length(), + "Bad path: " + pathStr); + dot = pathStr.indexOf('.', lastSlash); + } else { + dot = pathStr.indexOf('.'); + } + + if (dot >= 0) { + return pathStr.substring(0, dot) + "-" + uuid + pathStr.substring(dot); + } else { + return pathStr + "-" + uuid; + } + } + + /** + * Get the parent path of a string path: everything up to but excluding + * the last "/" in the path. + * @param pathStr path as a string + * @return the parent or null if there is no parent. + */ + public static String getParent(String pathStr) { + int lastSlash = pathStr.lastIndexOf('/'); + if (lastSlash >= 0) { + return pathStr.substring(0, lastSlash); + } + return null; + } + + /** + * Using {@code URI#relativize()}, build the relative path from the + * base path to the full path. + * If {@code childPath} is not a child of {@code basePath} the outcome + * os undefined. + * @param basePath base path + * @param fullPath full path under the base path. + * @return the relative path + */ + public static String getRelativePath(Path basePath, + Path fullPath) { + return basePath.toUri().relativize(fullPath.toUri()).getPath(); + } + + /** + * Varags constructor of paths. Not very efficient. + * @param parent parent path + * @param child child entries. "" elements are skipped. + * @return the full child path. + */ + public static Path path(Path parent, String... child) { + Path p = parent; + for (String c : child) { + if (!c.isEmpty()) { + p = new Path(p, c); + } + } + return p; + } + + /** + * A cache of temporary folders. There's a risk here that the cache + * gets too big + */ + private static Cache<TaskAttemptID, Path> tempFolders = CacheBuilder + .newBuilder().build(); + + /** + * Get the task attempt temporary directory in the local filesystem. + * @param conf configuration + * @param uuid some UUID, such as a job UUID + * @param attemptID attempt ID + * @return a local task attempt directory. + * @throws IOException IO problem. + */ + public static Path getLocalTaskAttemptTempDir(final Configuration conf, + final String uuid, + final TaskAttemptID attemptID) + throws IOException { + try { + final LocalDirAllocator allocator = + new LocalDirAllocator(Constants.BUFFER_DIR); + return tempFolders.get(attemptID, + () -> { + return FileSystem.getLocal(conf).makeQualified( + allocator.getLocalPathForWrite(uuid, conf)); + }); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } catch (UncheckedExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw new RuntimeException(e); + } + } + + /** + * Remove all information held about task attempts. + * @param attemptID attempt ID. + */ + public static void clearTempFolderInfo(final TaskAttemptID attemptID) { + tempFolders.invalidate(attemptID); + } + + /** + * Reset the temp folder cache; useful in tests. + */ + @VisibleForTesting + public static void resetTempFolderCache() { + tempFolders.invalidateAll(); + } + + /** + * Try to come up with a good temp directory for different filesystems. + * @param fs filesystem + * @param conf configuration + * @return a qualified path under which temporary work can go. + */ + public static Path tempDirForStaging(FileSystem fs, + Configuration conf) { + String fallbackPath = fs.getScheme().equals("file") + ? System.getProperty(JAVA_IO_TMPDIR) + : FILESYSTEM_TEMP_PATH; + + return fs.makeQualified(new Path(conf.getTrimmed( + FS_S3A_COMMITTER_STAGING_TMP_PATH, fallbackPath))); + } + + /** + * Get the Application Attempt ID for this job. + * @param conf the config to look in + * @return the Application Attempt ID for a given job. + */ + private static int getAppAttemptId(Configuration conf) { + return conf.getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + } + + /** + * Build a qualified temporary path for the multipart upload commit + * information in the cluster filesystem. + * Path is built by + * {@link #getMultipartUploadCommitsDirectory(FileSystem, Configuration, String)} + * @param conf configuration defining default FS. + * @param uuid uuid of job + * @return a path which can be used for temporary work + * @throws IOException on an IO failure. + */ + public static Path getMultipartUploadCommitsDirectory(Configuration conf, + String uuid) throws IOException { + return getMultipartUploadCommitsDirectory(FileSystem.get(conf), conf, uuid); + } + + /** + * Build a qualified temporary path for the multipart upload commit + * information in the supplied filesystem + * (which is expected to be the cluster FS). + * Currently {code $tempDir/$user/$uuid/staging-uploads} where + * {@code tempDir} is from + * {@link #tempDirForStaging(FileSystem, Configuration)}. + * @param fs target FS + * @param conf configuration + * @param uuid uuid of job + * @return a path which can be used for temporary work + * @throws IOException on an IO failure. + */ + @VisibleForTesting + static Path getMultipartUploadCommitsDirectory(FileSystem fs, + Configuration conf, String uuid) throws IOException { + return path( + tempDirForStaging(fs, conf), + UserGroupInformation.getCurrentUser().getShortUserName(), + uuid, + STAGING_UPLOADS); + } + + /** + * Returns the partition of a relative file path, or null if the path is a + * file name with no relative directory. + * + * @param relative a relative file path + * @return the partition of the relative file path + */ + protected static String getPartition(String relative) { + return getParent(relative); + } + + /** + * Get the set of partitions from the list of files being staged. + * This is all immediate parents of those files. If a file is in the root + * dir, the partition is declared to be + * {@link StagingCommitterConstants#TABLE_ROOT}. + * @param attemptPath path for the attempt + * @param taskOutput list of output files. + * @return list of partitions. + * @throws IOException IO failure + */ + public static Set<String> getPartitions(Path attemptPath, + List<? extends FileStatus> taskOutput) + throws IOException { + // get a list of partition directories + Set<String> partitions = Sets.newLinkedHashSet(); + for (FileStatus fileStatus : taskOutput) { + // sanity check the output paths + Path outputFile = fileStatus.getPath(); + if (!fileStatus.isFile()) { + throw new PathIsDirectoryException(outputFile.toString()); + } + String partition = getPartition( + getRelativePath(attemptPath, outputFile)); + partitions.add(partition != null ? partition : TABLE_ROOT); + } + + return partitions; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java new file mode 100644 index 0000000..922d1ad --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -0,0 +1,851 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.Tasks; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import static com.google.common.base.Preconditions.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Invoker.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; + +/** + * Committer based on the contributed work of the + * <a href="https://github.com/rdblue/s3committer">Netflix multipart committers.</a> + * <ol> + * <li> + * The working directory of each task is actually under a temporary + * path in the local filesystem; jobs write directly into it. + * </li> + * <li> + * Task Commit: list all files under the task working dir, upload + * each of them but do not commit the final operation. + * Persist the information for each pending commit into the cluster + * for enumeration and commit by the job committer. + * </li> + * <li>Task Abort: recursive delete of task working dir.</li> + * <li>Job Commit: list all pending PUTs to commit; commit them.</li> + * <li> + * Job Abort: list all pending PUTs to commit; abort them. + * Delete all task attempt directories. + * </li> + * </ol> + * + * This is the base class of the Partitioned and Directory committers. + * It does not do any conflict resolution, and is made non-abstract + * primarily for test purposes. It is not expected to be used in production. + */ +public class StagingCommitter extends AbstractS3ACommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + StagingCommitter.class); + + /** Name: {@value}. */ + public static final String NAME = "staging"; + private final Path constructorOutputPath; + private final long uploadPartSize; + private final String uuid; + private final boolean uniqueFilenames; + private final FileOutputCommitter wrappedCommitter; + + private ConflictResolution conflictResolution; + private String s3KeyPrefix; + + /** The directory in the cluster FS for commits to go to. */ + private Path commitsDirectory; + + /** + * Committer for a single task attempt. + * @param outputPath final output path + * @param context task context + * @throws IOException on a failure + */ + public StagingCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.constructorOutputPath = checkNotNull(getOutputPath(), "output path"); + Configuration conf = getConf(); + this.uploadPartSize = conf.getLongBytes( + MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + this.uuid = getUploadUUID(conf, context.getJobID()); + this.uniqueFilenames = conf.getBoolean( + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); + setWorkPath(buildWorkPath(context, uuid)); + this.wrappedCommitter = createWrappedCommitter(context, conf); + setOutputPath(constructorOutputPath); + Path finalOutputPath = getOutputPath(); + Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null"); + S3AFileSystem fs = getS3AFileSystem(finalOutputPath, + context.getConfiguration(), false); + s3KeyPrefix = fs.pathToKey(finalOutputPath); + LOG.debug("{}: final output path is {}", getRole(), finalOutputPath); + // forces evaluation and caching of the resolution mode. + ConflictResolution mode = getConflictResolutionMode(getJobContext(), + fs.getConf()); + LOG.debug("Conflict resolution mode: {}", mode); + } + + @Override + public String getName() { + return NAME; + } + + /** + * Create the wrapped committer. + * This includes customizing its options, and setting up the destination + * directory. + * @param context job/task context. + * @param conf config + * @return the inner committer + * @throws IOException on a failure + */ + protected FileOutputCommitter createWrappedCommitter(JobContext context, + Configuration conf) throws IOException { + + // explicitly choose commit algorithm + initFileOutputCommitterOptions(context); + commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid); + return new FileOutputCommitter(commitsDirectory, context); + } + + /** + * Init the context config with everything needed for the file output + * committer. In particular, this code currently only works with + * commit algorithm 1. + * @param context context to configure. + */ + protected void initFileOutputCommitterOptions(JobContext context) { + context.getConfiguration() + .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("StagingCommitter{"); + sb.append(super.toString()); + sb.append(", conflictResolution=").append(conflictResolution); + if (wrappedCommitter != null) { + sb.append(", wrappedCommitter=").append(wrappedCommitter); + } + sb.append('}'); + return sb.toString(); + } + + /** + * Get the UUID of an upload; may be the job ID. + * Spark will use a fake app ID based on the current minute and job ID 0. + * To avoid collisions, the key policy is: + * <ol> + * <li>Value of {@link InternalCommitterConstants#FS_S3A_COMMITTER_STAGING_UUID}.</li> + * <li>Value of {@code "spark.sql.sources.writeJobUUID"}.</li> + * <li>Value of {@code "spark.app.id"}.</li> + * <li>JobId passed in.</li> + * </ol> + * The staging UUID is set in in {@link #setupJob(JobContext)} and so will + * be valid in all sequences where the job has been set up for the + * configuration passed in. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, String jobId) { + return conf.getTrimmed( + InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, + conf.getTrimmed(SPARK_WRITE_UUID, + conf.getTrimmed(SPARK_APP_ID, jobId))); + } + + /** + * Get the UUID of a Job. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, JobID jobId) { + return getUploadUUID(conf, jobId.toString()); + } + + /** + * Get the work path for a task. + * @param context job/task complex + * @param uuid UUID + * @return a path or null if the context is not of a task + * @throws IOException failure to build the path + */ + private static Path buildWorkPath(JobContext context, String uuid) + throws IOException { + if (context instanceof TaskAttemptContext) { + return taskAttemptWorkingPath((TaskAttemptContext) context, uuid); + } else { + return null; + } + } + + /** + * Is this committer using unique filenames? + * @return true if unique filenames are used. + */ + public Boolean useUniqueFilenames() { + return uniqueFilenames; + } + + /** + * Get the filesystem for the job attempt. + * @param context the context of the job. This is used to get the + * application attempt ID. + * @return the FS to store job attempt data. + * @throws IOException failure to create the FS. + */ + public FileSystem getJobAttemptFileSystem(JobContext context) + throws IOException { + Path p = getJobAttemptPath(context); + return p.getFileSystem(context.getConfiguration()); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt ID. + * @param out the output path to place these in. + * @return the path to store job attempt data. + */ + public static Path getJobAttemptPath(JobContext context, Path out) { + return getJobAttemptPath(getAppAttemptId(context), out); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + private static Path getJobAttemptPath(int appAttemptId, Path out) { + return new Path(getPendingJobAttemptsPath(out), + String.valueOf(appAttemptId)); + } + + @Override + protected Path getJobAttemptPath(int appAttemptId) { + return new Path(getPendingJobAttemptsPath(commitsDirectory), + String.valueOf(appAttemptId)); + } + + /** + * Compute the path where the output of pending task attempts are stored. + * @param context the context of the job with pending tasks. + * @return the path where the output of pending task attempts are stored. + */ + private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { + return new Path(getJobAttemptPath(context, out), TEMPORARY); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * + * @param context the context of the task attempt. + * @param out The output path to put things in. + * @return the path where a task attempt should be stored. + */ + public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { + return new Path(getPendingTaskAttemptsPath(context, out), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Get the location of pending job attempts. + * @param out the base output directory. + * @return the location of pending job attempts. + */ + private static Path getPendingJobAttemptsPath(Path out) { + Preconditions.checkNotNull(out, "Null 'out' path"); + return new Path(out, TEMPORARY); + } + + /** + * Compute the path where the output of a committed task is stored until + * the entire job is committed. + * @param context the context of the task attempt + * @return the path where the output of a committed task is stored until + * the entire job is committed. + */ + public Path getCommittedTaskPath(TaskAttemptContext context) { + return getCommittedTaskPath(getAppAttemptId(context), context); + } + + /** + * Validate the task attempt context; makes sure + * that the task attempt ID data is valid. + * @param context task context + */ + private static void validateContext(TaskAttemptContext context) { + Preconditions.checkNotNull(context, "null context"); + Preconditions.checkNotNull(context.getTaskAttemptID(), + "null task attempt ID"); + Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(), + "null task ID"); + Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(), + "null job ID"); + } + + /** + * Compute the path where the output of a committed task is stored until the + * entire job is committed for a specific application attempt. + * @param appAttemptId the ID of the application attempt to use + * @param context the context of any task. + * @return the path where the output of a committed task is stored. + */ + protected Path getCommittedTaskPath(int appAttemptId, + TaskAttemptContext context) { + validateContext(context); + return new Path(getJobAttemptPath(appAttemptId), + String.valueOf(context.getTaskAttemptID().getTaskID())); + } + + @Override + public Path getTempTaskAttemptPath(TaskAttemptContext context) { + throw new UnsupportedOperationException("Unimplemented"); + } + + /** + * Lists the output of a task under the task attempt path. Subclasses can + * override this method to change how output files are identified. + * <p> + * This implementation lists the files that are direct children of the output + * path and filters hidden files (file names starting with '.' or '_'). + * <p> + * The task attempt path is provided by + * {@link #getTaskAttemptPath(TaskAttemptContext)} + * + * @param context this task's {@link TaskAttemptContext} + * @return the output files produced by this task in the task attempt path + * @throws IOException on a failure + */ + protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context) + throws IOException { + + // get files on the local FS in the attempt path + Path attemptPath = getTaskAttemptPath(context); + Preconditions.checkNotNull(attemptPath, + "No attemptPath path in {}", this); + + LOG.debug("Scanning {} for files to commit", attemptPath); + + return listAndFilter(getTaskAttemptFilesystem(context), + attemptPath, true, HIDDEN_FILE_FILTER); + } + + /** + * Returns the final S3 key for a relative path. Subclasses can override this + * method to upload files to a different S3 location. + * <p> + * This implementation concatenates the relative path with the key prefix + * from the output path. + * If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} is + * set, then the task UUID is also included in the calculation + * + * @param relative the path of a file relative to the task attempt path + * @param context the JobContext or TaskAttemptContext for this job + * @return the S3 key where the file will be uploaded + */ + protected String getFinalKey(String relative, JobContext context) { + if (uniqueFilenames) { + return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid); + } else { + return getS3KeyPrefix(context) + "/" + relative; + } + } + + /** + * Returns the final S3 location for a relative path as a Hadoop {@link Path}. + * This is a final method that calls {@link #getFinalKey(String, JobContext)} + * to determine the final location. + * + * @param relative the path of a file relative to the task attempt path + * @param context the JobContext or TaskAttemptContext for this job + * @return the S3 Path where the file will be uploaded + */ + protected final Path getFinalPath(String relative, JobContext context) + throws IOException { + return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, context)); + } + + /** + * Return the local work path as the destination for writing work. + * @param context the context of the task attempt. + * @return a path in the local filesystem. + */ + @Override + public Path getBaseTaskAttemptPath(TaskAttemptContext context) { + // a path on the local FS for files that will be uploaded + return getWorkPath(); + } + + /** + * For a job attempt path, the staging committer returns that of the + * wrapped committer. + * @param context the context of the job. + * @return a path in HDFS. + */ + @Override + public Path getJobAttemptPath(JobContext context) { + return wrappedCommitter.getJobAttemptPath(context); + } + + /** + * Set up the job, including calling the same method on the + * wrapped committer. + * @param context job context + * @throws IOException IO failure. + */ + @Override + public void setupJob(JobContext context) throws IOException { + LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context)); + context.getConfiguration().set( + InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid); + wrappedCommitter.setupJob(context); + } + + /** + * Get the list of pending uploads for this job attempt. + * @param context job context + * @return a list of pending uploads. + * @throws IOException Any IO failure + */ + @Override + protected List<SinglePendingCommit> listPendingUploadsToCommit( + JobContext context) + throws IOException { + return listPendingUploads(context, false); + } + + /** + * Get the list of pending uploads for this job attempt, swallowing + * exceptions. + * @param context job context + * @return a list of pending uploads. If an exception was swallowed, + * then this may not match the actual set of pending operations + * @throws IOException shouldn't be raised, but retained for the compiler + */ + protected List<SinglePendingCommit> listPendingUploadsToAbort( + JobContext context) throws IOException { + return listPendingUploads(context, true); + } + + /** + * Get the list of pending uploads for this job attempt. + * @param context job context + * @param suppressExceptions should exceptions be swallowed? + * @return a list of pending uploads. If exceptions are being swallowed, + * then this may not match the actual set of pending operations + * @throws IOException Any IO failure which wasn't swallowed. + */ + protected List<SinglePendingCommit> listPendingUploads( + JobContext context, boolean suppressExceptions) throws IOException { + try { + Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context); + final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem( + context.getConfiguration()); + return loadPendingsetFiles(context, suppressExceptions, attemptFS, + listAndFilter(attemptFS, + wrappedJobAttemptPath, false, + HIDDEN_FILE_FILTER)); + } catch (IOException e) { + // unable to work with endpoint, if suppressing errors decide our actions + maybeIgnore(suppressExceptions, "Listing pending uploads", e); + } + // reached iff an IOE was caught and swallowed + return new ArrayList<>(0); + } + + @Override + public void cleanupStagingDirs() { + Path workPath = getWorkPath(); + if (workPath != null) { + LOG.debug("Cleaning up work path {}", workPath); + ignoreIOExceptions(LOG, "cleaning up", workPath.toString(), + () -> deleteQuietly(workPath.getFileSystem(getConf()), + workPath, true)); + } + } + + @Override + @SuppressWarnings("deprecation") + protected void cleanup(JobContext context, + boolean suppressExceptions) + throws IOException { + maybeIgnore(suppressExceptions, "Cleanup wrapped committer", + () -> wrappedCommitter.cleanupJob(context)); + maybeIgnore(suppressExceptions, "Delete destination paths", + () -> deleteDestinationPaths(context)); + super.cleanup(context, suppressExceptions); + } + + @Override + protected void abortPendingUploadsInCleanup(boolean suppressExceptions) + throws IOException { + if (getConf() + .getBoolean(FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, true)) { + super.abortPendingUploadsInCleanup(suppressExceptions); + } else { + LOG.info("Not cleanup up pending uploads to {} as {} is false ", + getOutputPath(), + FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS); + } + } + + @Override + protected void abortJobInternal(JobContext context, + boolean suppressExceptions) throws IOException { + String r = getRole(); + boolean failed = false; + try (DurationInfo d = new DurationInfo(LOG, + "%s: aborting job in state %s ", r, jobIdString(context))) { + List<SinglePendingCommit> pending = listPendingUploadsToAbort(context); + abortPendingUploads(context, pending, suppressExceptions); + } catch (FileNotFoundException e) { + // nothing to list + LOG.debug("No job directory to read uploads from"); + } catch (IOException e) { + failed = true; + maybeIgnore(suppressExceptions, "aborting job", e); + } finally { + super.abortJobInternal(context, failed || suppressExceptions); + } + } + + /** + * Delete the working paths of a job. Does not attempt to clean up + * the work of the wrapped committer. + * <ol> + * <li>The job attempt path</li> + * <li>$dest/__temporary</li> + * <li>the local working directory for staged files</li> + * </ol> + * @param context job context + * @throws IOException IO failure + */ + protected void deleteDestinationPaths(JobContext context) throws IOException { + Path attemptPath = getJobAttemptPath(context); + ignoreIOExceptions(LOG, + "Deleting Job attempt Path", attemptPath.toString(), + () -> deleteWithWarning( + getJobAttemptFileSystem(context), + attemptPath, + true)); + + // delete the __temporary directory. This will cause problems + // if there is >1 task targeting the same dest dir + deleteWithWarning(getDestFS(), + new Path(getOutputPath(), TEMPORARY), + true); + // and the working path + deleteTaskWorkingPathQuietly(context); + } + + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + Path taskAttemptPath = getTaskAttemptPath(context); + try (DurationInfo d = new DurationInfo(LOG, + "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) { + // create the local FS + taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath); + wrappedCommitter.setupTask(context); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "%s: needsTaskCommit() Task %s", + getRole(), context.getTaskAttemptID())) { + // check for files on the local FS in the attempt path + Path attemptPath = getTaskAttemptPath(context); + FileSystem fs = getTaskAttemptFilesystem(context); + + // This could be made more efficient with a probe "hasChildren(Path)" + // which returns true if there is >1 entry under a given path. + FileStatus[] stats = fs.listStatus(attemptPath); + LOG.debug("{} files to commit under {}", stats.length, attemptPath); + return stats.length > 0; + } catch (FileNotFoundException e) { + // list didn't find a directory, so nothing to commit + // TODO: throw this up as an error? + LOG.info("No files to commit"); + throw e; + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "%s: commit task %s", getRole(), context.getTaskAttemptID())) { + int count = commitTaskInternal(context, getTaskOutput(context)); + LOG.info("{}: upload file count: {}", getRole(), count); + } catch (IOException e) { + LOG.error("{}: commit of task {} failed", + getRole(), context.getTaskAttemptID(), e); + getCommitOperations().taskCompleted(false); + throw e; + } + getCommitOperations().taskCompleted(true); + } + + /** + * Commit the task by uploading all created files and then + * writing a pending entry for them. + * @param context task context + * @param taskOutput list of files from the output + * @return number of uploads committed. + * @throws IOException IO Failures. + */ + protected int commitTaskInternal(final TaskAttemptContext context, + List<? extends FileStatus> taskOutput) + throws IOException { + LOG.debug("{}: commitTaskInternal", getRole()); + Configuration conf = context.getConfiguration(); + + final Path attemptPath = getTaskAttemptPath(context); + FileSystem attemptFS = getTaskAttemptFilesystem(context); + LOG.debug("{}: attempt path is {}", getRole(), attemptPath); + + // add the commits file to the wrapped committer's task attempt location. + // of this method. + Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context); + FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf); + + // keep track of unfinished commits in case one fails. if something fails, + // we will try to abort the ones that had already succeeded. + int commitCount = taskOutput.size(); + final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>(); + LOG.info("{}: uploading from staging directory to S3", getRole()); + LOG.info("{}: Saving pending data information to {}", + getRole(), commitsAttemptPath); + if (taskOutput.isEmpty()) { + // there is nothing to write. needsTaskCommit() should have caught + // this, so warn that there is some kind of problem in the protocol. + LOG.warn("{}: No files to commit", getRole()); + } else { + boolean threw = true; + // before the uploads, report some progress + context.progress(); + + PendingSet pendingCommits = new PendingSet(commitCount); + try { + Tasks.foreach(taskOutput) + .stopOnFailure() + .executeWith(buildThreadPool(context)) + .run(stat -> { + Path path = stat.getPath(); + File localFile = new File(path.toUri().getPath()); + String relative = Paths.getRelativePath(attemptPath, path); + String partition = Paths.getPartition(relative); + String key = getFinalKey(relative, context); + Path destPath = getDestS3AFS().keyToQualifiedPath(key); + SinglePendingCommit commit = getCommitOperations() + .uploadFileToPendingCommit( + localFile, + destPath, + partition, + uploadPartSize); + LOG.debug("{}: adding pending commit {}", getRole(), commit); + commits.add(commit); + }); + + for (SinglePendingCommit commit : commits) { + pendingCommits.add(commit); + } + + // save the data + // although overwrite=false, there's still a risk of > 1 entry being + // committed if the FS doesn't have create-no-overwrite consistency. + + LOG.debug("Saving {} pending commit(s)) to file {}", + pendingCommits.size(), + commitsAttemptPath); + pendingCommits.save(commitsFS, commitsAttemptPath, false); + threw = false; + + } finally { + if (threw) { + LOG.error( + "{}: Exception during commit process, aborting {} commit(s)", + getRole(), commits.size()); + Tasks.foreach(commits) + .suppressExceptions() + .run(commit -> getCommitOperations().abortSingleCommit(commit)); + deleteTaskAttemptPathQuietly(context); + } + } + // always purge attempt information at this point. + Paths.clearTempFolderInfo(context.getTaskAttemptID()); + } + + LOG.debug("Committing wrapped task"); + wrappedCommitter.commitTask(context); + + LOG.debug("Cleaning up attempt dir {}", attemptPath); + attemptFS.delete(attemptPath, true); + return commits.size(); + } + + /** + * Abort the task. + * The API specifies that the task has not yet been committed, so there are + * no uploads that need to be cancelled. + * Accordingly just delete files on the local FS, and call abortTask in + * the wrapped committer. + * <b>Important: this may be called in the AM after a container failure.</b> + * When that occurs and the failed container was on a different host in the + * cluster, the local files will not be deleted. + * @param context task context + * @throws IOException any failure + */ + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + // the API specifies that the task has not yet been committed, so there are + // no uploads that need to be cancelled. just delete files on the local FS. + try (DurationInfo d = new DurationInfo(LOG, + "Abort task %s", context.getTaskAttemptID())) { + deleteTaskAttemptPathQuietly(context); + deleteTaskWorkingPathQuietly(context); + wrappedCommitter.abortTask(context); + } catch (IOException e) { + LOG.error("{}: exception when aborting task {}", + getRole(), context.getTaskAttemptID(), e); + throw e; + } + } + + /** + * Get the work path for a task. + * @param context job/task complex + * @param uuid UUID + * @return a path + * @throws IOException failure to build the path + */ + private static Path taskAttemptWorkingPath(TaskAttemptContext context, + String uuid) throws IOException { + return getTaskAttemptPath(context, + Paths.getLocalTaskAttemptTempDir( + context.getConfiguration(), + uuid, + context.getTaskAttemptID())); + } + + /** + * Delete the working path of a task; no-op if there is none, that + * is: this is a job. + * @param context job/task context + */ + protected void deleteTaskWorkingPathQuietly(JobContext context) { + ignoreIOExceptions(LOG, "Delete working path", "", + () -> { + Path path = buildWorkPath(context, getUUID()); + if (path != null) { + deleteQuietly(path.getFileSystem(getConf()), path, true); + } + }); + } + + /** + * Get the key of the destination "directory" of the job/task. + * @param context job context + * @return key to write to + */ + private String getS3KeyPrefix(JobContext context) { + return s3KeyPrefix; + } + + /** + * A UUID for this upload, as calculated with. + * {@link #getUploadUUID(Configuration, String)} + * @return the UUID for files + */ + protected String getUUID() { + return uuid; + } + + /** + * Returns the {@link ConflictResolution} mode for this commit. + * + * @param context the JobContext for this commit + * @param fsConf filesystem config + * @return the ConflictResolution mode + */ + public final ConflictResolution getConflictResolutionMode( + JobContext context, + Configuration fsConf) { + if (conflictResolution == null) { + this.conflictResolution = ConflictResolution.valueOf( + getConfictModeOption(context, fsConf)); + } + return conflictResolution; + } + + /** + * Get the conflict mode option string. + * @param context context with the config + * @param fsConf filesystem config + * @return the trimmed configuration option, upper case. + */ + public static String getConfictModeOption(JobContext context, + Configuration fsConf) { + return getConfigurationOption(context, + fsConf, + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, + DEFAULT_CONFLICT_MODE).toUpperCase(Locale.ENGLISH); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java new file mode 100644 index 0000000..c5fb967 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Internal staging committer constants. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class StagingCommitterConstants { + + private StagingCommitterConstants() { + } + + /** + * The temporary path for staging data, if not explicitly set. + * By using an unqualified path, this will be qualified to be relative + * to the users' home directory, so protectec from access for others. + */ + public static final String FILESYSTEM_TEMP_PATH = "tmp/staging"; + + /** Name of the root partition :{@value}. */ + public static final String TABLE_ROOT = "table_root"; + + /** + * Filename used under {@code ~/${UUID}} for the staging files. + */ + public static final String STAGING_UPLOADS = "staging-uploads"; + + // Spark configuration keys + + /** + * The UUID for jobs: {@value}. + */ + public static final String SPARK_WRITE_UUID = + "spark.sql.sources.writeJobUUID"; + + /** + * The App ID for jobs. + */ + + public static final String SPARK_APP_ID = "spark.app.id"; + + public static final String JAVA_IO_TMPDIR = "java.io.tmpdir"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java new file mode 100644 index 0000000..292b16b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +/** + * Factory for the staging committer. + * This is for internal test use, rather than the public directory and + * partitioned committers. + */ +public class StagingCommitterFactory + extends AbstractS3ACommitterFactory { + + /** + * Name of this class: {@value}. + */ + public static final String CLASSNAME + = "org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory"; + + public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException { + return new StagingCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java new file mode 100644 index 0000000..174fbb0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The staging committers. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.s3a.commit.staging; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 458eb83..13384cf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; @@ -67,8 +68,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3ARetryPolicy; +import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -211,6 +216,28 @@ public class DynamoDBMetadataStore implements MetadataStore { private RetryPolicy dataAccessRetryPolicy; private S3AInstrumentation.S3GuardInstrumentation instrumentation; + /** Owner FS: only valid if configured with an owner FS. */ + private S3AFileSystem owner; + + /** Invoker for IO. Until configured properly, use try-once. */ + private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, + Invoker.NO_OP + ); + + /** Data access can have its own policies. */ + private Invoker dataAccess; + + /** + * Total limit on the number of throttle events after which + * we stop warning in the log. Keeps the noise down. + */ + private static final int THROTTLE_EVENT_LOG_LIMIT = 100; + + /** + * Count of the total number of throttle events; used to crank back logging. + */ + private AtomicInteger throttleEventCount = new AtomicInteger(0); + /** * A utility function to create DynamoDB instance. * @param conf the file system configuration @@ -232,28 +259,34 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceRaw public void initialize(FileSystem fs) throws IOException { Preconditions.checkArgument(fs instanceof S3AFileSystem, "DynamoDBMetadataStore only supports S3A filesystem."); - final S3AFileSystem s3afs = (S3AFileSystem) fs; - instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation(); - final String bucket = s3afs.getBucket(); - String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY); + owner = (S3AFileSystem) fs; + instrumentation = owner.getInstrumentation().getS3GuardInstrumentation(); + final String bucket = owner.getBucket(); + conf = owner.getConf(); + String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); if (!StringUtils.isEmpty(confRegion)) { region = confRegion; LOG.debug("Overriding S3 region with configured DynamoDB region: {}", region); } else { - region = s3afs.getBucketLocation(); + region = owner.getBucketLocation(); LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region); } - username = s3afs.getUsername(); - conf = s3afs.getConf(); + username = owner.getUsername(); dynamoDB = createDynamoDB(conf, region); // use the bucket as the DynamoDB table name if not specified in config tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); - setMaxRetries(conf); + initDataAccessRetries(conf); + + // set up a full retry policy + invoker = new Invoker(new S3ARetryPolicy(conf), + this::retryEvent + ); initTable(); @@ -283,6 +316,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * @throws IllegalArgumentException if the configuration is incomplete */ @Override + @Retries.OnceRaw public void initialize(Configuration config) throws IOException { conf = config; // use the bucket as the DynamoDB table name if not specified in config @@ -295,7 +329,7 @@ public class DynamoDBMetadataStore implements MetadataStore { dynamoDB = createDynamoDB(conf, region); username = UserGroupInformation.getCurrentUser().getShortUserName(); - setMaxRetries(conf); + initDataAccessRetries(conf); initTable(); } @@ -304,22 +338,25 @@ public class DynamoDBMetadataStore implements MetadataStore { * Set retry policy. This is driven by the value of * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds. - * @param config + * @param config configuration for data access */ - private void setMaxRetries(Configuration config) { + private void initDataAccessRetries(Configuration config) { int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES, S3GUARD_DDB_MAX_RETRIES_DEFAULT); dataAccessRetryPolicy = RetryPolicies .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC, TimeUnit.MILLISECONDS); + dataAccess = new Invoker(dataAccessRetryPolicy, this::retryEvent); } @Override + @Retries.RetryTranslated public void delete(Path path) throws IOException { innerDelete(path, true); } @Override + @Retries.RetryTranslated public void forgetMetadata(Path path) throws IOException { innerDelete(path, false); } @@ -332,9 +369,10 @@ public class DynamoDBMetadataStore implements MetadataStore { * @param tombstone flag to create a tombstone marker * @throws IOException I/O error. */ - private void innerDelete(Path path, boolean tombstone) + @Retries.RetryTranslated + private void innerDelete(final Path path, boolean tombstone) throws IOException { - path = checkPath(path); + checkPath(path); LOG.debug("Deleting from table {} in region {}: {}", tableName, region, path); @@ -343,23 +381,25 @@ public class DynamoDBMetadataStore implements MetadataStore { LOG.debug("Skip deleting root directory as it does not exist in table"); return; } - - try { - if (tombstone) { - Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem( - PathMetadata.tombstone(path)); - table.putItem(item); - } else { - table.deleteItem(pathToKey(path)); - } - } catch (AmazonClientException e) { - throw translateException("delete", path, e); + // the policy on whether repeating delete operations is based + // on that of S3A itself + boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT; + if (tombstone) { + Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem( + PathMetadata.tombstone(path)); + invoker.retry("Put tombstone", path.toString(), idempotent, + () -> table.putItem(item)); + } else { + PrimaryKey key = pathToKey(path); + invoker.retry("Delete key", path.toString(), idempotent, + () -> table.deleteItem(key)); } } @Override + @Retries.RetryTranslated public void deleteSubtree(Path path) throws IOException { - path = checkPath(path); + checkPath(path); LOG.debug("Deleting subtree from table {} in region {}: {}", tableName, region, path); @@ -375,6 +415,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } } + @Retries.OnceRaw private Item getConsistentItem(PrimaryKey key) { final GetItemSpec spec = new GetItemSpec() .withPrimaryKey(key) @@ -383,52 +424,65 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceTranslated public PathMetadata get(Path path) throws IOException { return get(path, false); } @Override + @Retries.OnceTranslated public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag) throws IOException { - path = checkPath(path); + checkPath(path); LOG.debug("Get from table {} in region {}: {}", tableName, region, path); + return Invoker.once("get", path.toString(), + () -> innerGet(path, wantEmptyDirectoryFlag)); + } - try { - final PathMetadata meta; - if (path.isRoot()) { - // Root does not persist in the table - meta = new PathMetadata(makeDirStatus(username, path)); - } else { - final Item item = getConsistentItem(pathToKey(path)); - meta = itemToPathMetadata(item, username); - LOG.debug("Get from table {} in region {} returning for {}: {}", - tableName, region, path, meta); - } + /** + * Inner get operation, as invoked in the retry logic. + * @param path the path to get + * @param wantEmptyDirectoryFlag Set to true to give a hint to the + * MetadataStore that it should try to compute the empty directory flag. + * @return metadata for {@code path}, {@code null} if not found + * @throws IOException IO problem + * @throws AmazonClientException dynamo DB level problem + */ + @Retries.OnceRaw + private PathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag) + throws IOException { + final PathMetadata meta; + if (path.isRoot()) { + // Root does not persist in the table + meta = new PathMetadata(makeDirStatus(username, path)); + } else { + final Item item = getConsistentItem(pathToKey(path)); + meta = itemToPathMetadata(item, username); + LOG.debug("Get from table {} in region {} returning for {}: {}", + tableName, region, path, meta); + } - if (wantEmptyDirectoryFlag && meta != null) { - final FileStatus status = meta.getFileStatus(); - // for directory, we query its direct children to determine isEmpty bit - if (status.isDirectory()) { - final QuerySpec spec = new QuerySpec() - .withHashKey(pathToParentKeyAttribute(path)) - .withConsistentRead(true) - .withFilterExpression(IS_DELETED + " = :false") - .withValueMap(deleteTrackingValueMap); - final ItemCollection<QueryOutcome> items = table.query(spec); - boolean hasChildren = items.iterator().hasNext(); - // When this class has support for authoritative - // (fully-cached) directory listings, we may also be able to answer - // TRUE here. Until then, we don't know if we have full listing or - // not, thus the UNKNOWN here: - meta.setIsEmptyDirectory( - hasChildren ? Tristate.FALSE : Tristate.UNKNOWN); - } + if (wantEmptyDirectoryFlag && meta != null) { + final FileStatus status = meta.getFileStatus(); + // for directory, we query its direct children to determine isEmpty bit + if (status.isDirectory()) { + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true) + .withFilterExpression(IS_DELETED + " = :false") + .withValueMap(deleteTrackingValueMap); + final ItemCollection<QueryOutcome> items = table.query(spec); + boolean hasChildren = items.iterator().hasNext(); + // When this class has support for authoritative + // (fully-cached) directory listings, we may also be able to answer + // TRUE here. Until then, we don't know if we have full listing or + // not, thus the UNKNOWN here: + meta.setIsEmptyDirectory( + hasChildren ? Tristate.FALSE : Tristate.UNKNOWN); } - - return meta; - } catch (AmazonClientException e) { - throw translateException("get", path, e); } + + return meta; } /** @@ -445,35 +499,38 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override - public DirListingMetadata listChildren(Path path) throws IOException { - path = checkPath(path); + @Retries.OnceTranslated + public DirListingMetadata listChildren(final Path path) throws IOException { + checkPath(path); LOG.debug("Listing table {} in region {}: {}", tableName, region, path); // find the children in the table - try { - final QuerySpec spec = new QuerySpec() - .withHashKey(pathToParentKeyAttribute(path)) - .withConsistentRead(true); // strictly consistent read - final ItemCollection<QueryOutcome> items = table.query(spec); - - final List<PathMetadata> metas = new ArrayList<>(); - for (Item item : items) { - PathMetadata meta = itemToPathMetadata(item, username); - metas.add(meta); - } - LOG.trace("Listing table {} in region {} for {} returning {}", - tableName, region, path, metas); + return Invoker.once("listChildren", path.toString(), + () -> { + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true); // strictly consistent read + final ItemCollection<QueryOutcome> items = table.query(spec); - return (metas.isEmpty() && get(path) == null) - ? null - : new DirListingMetadata(path, metas, false); - } catch (AmazonClientException e) { - // failure, including the path not being present - throw translateException("listChildren", path, e); - } + final List<PathMetadata> metas = new ArrayList<>(); + for (Item item : items) { + PathMetadata meta = itemToPathMetadata(item, username); + metas.add(meta); + } + LOG.trace("Listing table {} in region {} for {} returning {}", + tableName, region, path, metas); + + return (metas.isEmpty() && get(path) == null) + ? null + : new DirListingMetadata(path, metas, false); + }); } - // build the list of all parent entries. + /** + * build the list of all parent entries. + * @param pathsToCreate paths to create + * @return the full ancestry paths + */ Collection<PathMetadata> completeAncestry( Collection<PathMetadata> pathsToCreate) { // Key on path to allow fast lookup @@ -499,6 +556,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceTranslated public void move(Collection<Path> pathsToDelete, Collection<PathMetadata> pathsToCreate) throws IOException { if (pathsToDelete == null && pathsToCreate == null) { @@ -527,21 +585,20 @@ public class DynamoDBMetadataStore implements MetadataStore { } } - try { - processBatchWriteRequest(null, pathMetadataToItem(newItems)); - } catch (AmazonClientException e) { - throw translateException("move", (String) null, e); - } + Invoker.once("move", tableName, + () -> processBatchWriteRequest(null, pathMetadataToItem(newItems))); } /** * Helper method to issue a batch write request to DynamoDB. * - * Callers of this method should catch the {@link AmazonClientException} and - * translate it for better error report and easier debugging. + * The retry logic here is limited to repeating the write operations + * until all items have been written; there is no other attempt + * at recovery/retry. Throttling is handled internally. * @param keysToDelete primary keys to be deleted; can be null * @param itemsToPut new items to be put; can be null */ + @Retries.OnceRaw("Outstanding batch items are updated with backoff") private void processBatchWriteRequest(PrimaryKey[] keysToDelete, Item[] itemsToPut) throws IOException { final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length); @@ -575,7 +632,7 @@ public class DynamoDBMetadataStore implements MetadataStore { // Check for unprocessed keys in case of exceeding provisioned throughput Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems(); int retryCount = 0; - while (unprocessed.size() > 0) { + while (!unprocessed.isEmpty()) { retryBackoff(retryCount++); res = dynamoDB.batchWriteItemUnprocessed(unprocessed); unprocessed = res.getUnprocessedItems(); @@ -603,12 +660,17 @@ public class DynamoDBMetadataStore implements MetadataStore { LOG.debug("Sleeping {} msec before next retry", action.delayMillis); Thread.sleep(action.delayMillis); } + } catch (InterruptedException e) { + throw (IOException)new InterruptedIOException(e.toString()).initCause(e); + } catch (IOException e) { + throw e; } catch (Exception e) { throw new IOException("Unexpected exception", e); } } @Override + @Retries.OnceRaw public void put(PathMetadata meta) throws IOException { // For a deeply nested path, this method will automatically create the full // ancestry and save respective item in DynamoDB table. @@ -624,6 +686,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceRaw public void put(Collection<PathMetadata> metas) throws IOException { LOG.debug("Saving batch to table {} in region {}", tableName, region); @@ -633,6 +696,7 @@ public class DynamoDBMetadataStore implements MetadataStore { /** * Helper method to get full path of ancestors that are nonexistent in table. */ + @Retries.OnceRaw private Collection<PathMetadata> fullPathsToPut(PathMetadata meta) throws IOException { checkPathMetadata(meta); @@ -675,25 +739,34 @@ public class DynamoDBMetadataStore implements MetadataStore { null, owner, owner, f); } + /** + * {@inheritDoc}. + * There is retry around building the list of paths to update, but + * the call to {@link #processBatchWriteRequest(PrimaryKey[], Item[])} + * is only tried once. + * @param meta Directory listing metadata. + * @throws IOException + */ @Override + @Retries.OnceTranslated("retry(listFullPaths); once(batchWrite)") public void put(DirListingMetadata meta) throws IOException { LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta); // directory path - PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username), + Path path = meta.getPath(); + PathMetadata p = new PathMetadata(makeDirStatus(path, username), meta.isEmpty(), false); // First add any missing ancestors... - final Collection<PathMetadata> metasToPut = fullPathsToPut(p); + final Collection<PathMetadata> metasToPut = invoker.retry( + "paths to put", path.toString(), true, + () -> fullPathsToPut(p)); // next add all children of the directory metasToPut.addAll(meta.getListing()); - try { - processBatchWriteRequest(null, pathMetadataToItem(metasToPut)); - } catch (AmazonClientException e) { - throw translateException("put", (String) null, e); - } + Invoker.once("put", path.toString(), + () -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut))); } @Override @@ -709,6 +782,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceTranslated public void destroy() throws IOException { if (table == null) { LOG.info("In destroy(): no table to delete"); @@ -731,10 +805,11 @@ public class DynamoDBMetadataStore implements MetadataStore { throw new InterruptedIOException("Table " + tableName + " in region " + region + " has not been deleted"); } catch (AmazonClientException e) { - throw translateException("destroy", (String) null, e); + throw translateException("destroy", tableName, e); } } + @Retries.OnceRaw private ItemCollection<ScanOutcome> expiredFiles(long modTime) { String filterExpression = "mod_time < :mod_time"; String projectionExpression = "parent,child"; @@ -743,6 +818,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceRaw("once(batchWrite)") public void prune(long modTime) throws IOException { int itemCount = 0; try { @@ -797,6 +873,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * or table is being deleted, or any other I/O exception occurred. */ @VisibleForTesting + @Retries.OnceRaw void initTable() throws IOException { table = dynamoDB.getTable(tableName); try { @@ -848,7 +925,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } } catch (AmazonClientException e) { - throw translateException("initTable", (String) null, e); + throw translateException("initTable", tableName, e); } } @@ -856,8 +933,11 @@ public class DynamoDBMetadataStore implements MetadataStore { * Get the version mark item in the existing DynamoDB table. * * As the version marker item may be created by another concurrent thread or - * process, we retry a limited times before we fail to get it. + * process, we sleep and retry a limited times before we fail to get it. + * This does not include handling any failure other than "item not found", + * so this method is tagged as "OnceRaw" */ + @Retries.OnceRaw private Item getVersionMarkerItem() throws IOException { final PrimaryKey versionMarkerKey = createVersionMarkerPrimaryKey(VERSION_MARKER); @@ -913,16 +993,20 @@ public class DynamoDBMetadataStore implements MetadataStore { * @param t table to block on. * @throws IOException IO problems * @throws InterruptedIOException if the wait was interrupted + * @throws IllegalArgumentException if an exception was raised in the waiter */ - private void waitForTableActive(Table t) throws IOException { + @Retries.OnceRaw + private void waitForTableActive(Table t) throws InterruptedIOException { try { t.waitForActive(); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for table {} in region {} active", tableName, region, e); Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException("DynamoDB table '" - + tableName + "' is not active yet in region " + region).initCause(e); + throw (InterruptedIOException) + new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region) + .initCause(e); } } @@ -933,6 +1017,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * @throws IOException on any failure. * @throws InterruptedIOException if the wait was interrupted */ + @Retries.OnceRaw private void createTable(ProvisionedThroughput capacity) throws IOException { try { LOG.info("Creating non-existent DynamoDB table {} in region {}", @@ -960,6 +1045,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * @param item item to put * @return the outcome. */ + @Retries.OnceRaw PutItemOutcome putItem(Item item) { LOG.debug("Putting item {}", item); return table.putItem(item); @@ -967,22 +1053,27 @@ public class DynamoDBMetadataStore implements MetadataStore { /** * Provision the table with given read and write capacity units. + * Call will fail if the table is busy, or the new values match the current + * ones. + * @param readCapacity read units + * @param writeCapacity write units + * @throws IOException on a failure */ + @Retries.RetryTranslated void provisionTable(Long readCapacity, Long writeCapacity) throws IOException { final ProvisionedThroughput toProvision = new ProvisionedThroughput() .withReadCapacityUnits(readCapacity) .withWriteCapacityUnits(writeCapacity); - try { - final ProvisionedThroughputDescription p = - table.updateTable(toProvision).getProvisionedThroughput(); - LOG.info("Provision table {} in region {}: readCapacityUnits={}, " - + "writeCapacityUnits={}", - tableName, region, p.getReadCapacityUnits(), - p.getWriteCapacityUnits()); - } catch (AmazonClientException e) { - throw translateException("provisionTable", (String) null, e); - } + invoker.retry("ProvisionTable", tableName, true, + () -> { + final ProvisionedThroughputDescription p = + table.updateTable(toProvision).getProvisionedThroughput(); + LOG.info("Provision table {} in region {}: readCapacityUnits={}, " + + "writeCapacityUnits={}", + tableName, region, p.getReadCapacityUnits(), + p.getWriteCapacityUnits()); + }); } Table getTable() { @@ -999,8 +1090,10 @@ public class DynamoDBMetadataStore implements MetadataStore { } /** - * Validates a path object; it must be absolute, and contain a host - * (bucket) component. + * Validates a path object; it must be absolute, have an s3a:/// scheme + * and contain a host (bucket) component. + * @param path path to check + * @return the path passed in */ private Path checkPath(Path path) { Preconditions.checkNotNull(path); @@ -1025,6 +1118,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceRaw public Map<String, String> getDiagnostics() throws IOException { Map<String, String> map = new TreeMap<>(); if (table != null) { @@ -1052,6 +1146,7 @@ public class DynamoDBMetadataStore implements MetadataStore { return map; } + @Retries.OnceRaw private TableDescription getTableDescription(boolean forceUpdate) { TableDescription desc = table.getDescription(); if (desc == null || forceUpdate) { @@ -1061,6 +1156,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override + @Retries.OnceRaw public void updateParameters(Map<String, String> parameters) throws IOException { Preconditions.checkNotNull(table, "Not initialized"); @@ -1103,4 +1199,46 @@ public class DynamoDBMetadataStore implements MetadataStore { } } + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param attempts number of attempts + * @param idempotent is the method idempotent + */ + void retryEvent( + String text, + IOException ex, + int attempts, + boolean idempotent) { + if (S3AUtils.isThrottleException(ex)) { + // throttled + if (instrumentation != null) { + instrumentation.throttled(); + } + int eventCount = throttleEventCount.addAndGet(1); + if (attempts == 1 && eventCount < THROTTLE_EVENT_LOG_LIMIT) { + LOG.warn("DynamoDB IO limits reached in {};" + + " consider increasing capacity: {}", text, ex.toString()); + LOG.debug("Throttled", ex); + } else { + // user has been warned already, log at debug only. + LOG.debug("DynamoDB IO limits reached in {};" + + " consider increasing capacity: {}", text, ex.toString()); + } + } else if (attempts == 1) { + // not throttled. Log on the first attempt only + LOG.info("Retrying {}: {}", text, ex.toString()); + LOG.debug("Retrying {}", text, ex); + } + + if (instrumentation != null) { + // note a retry + instrumentation.retrying(); + } + if (owner != null) { + owner.metastoreOperationRetried(ex, attempts, idempotent); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index c7c810a..a56b055 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.Tristate; @@ -83,6 +84,7 @@ public final class S3Guard { * @return Reference to new MetadataStore. * @throws IOException if the metadata store cannot be instantiated */ + @Retries.OnceTranslated public static MetadataStore getMetadataStore(FileSystem fs) throws IOException { Preconditions.checkNotNull(fs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index 4f0e8f7..ace043b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; @@ -960,6 +961,7 @@ public abstract class S3GuardTool extends Configured implements Tool { public static final String AUTH_FLAG = "auth"; public static final String NONAUTH_FLAG = "nonauth"; public static final String ENCRYPTION_FLAG = "encryption"; + public static final String MAGIC_FLAG = "magic"; public static final String PURPOSE = "provide/check S3Guard information" + " about a specific bucket"; @@ -967,11 +969,15 @@ public abstract class S3GuardTool extends Configured implements Tool { + "\t" + PURPOSE + "\n\n" + "Common options:\n" + " -" + GUARDED_FLAG + " - Require S3Guard\n" + + " -" + UNGUARDED_FLAG + " - Require S3Guard to be disabled\n" + + " -" + AUTH_FLAG + " - Require the S3Guard mode to be \"authoritative\"\n" + + " -" + NONAUTH_FLAG + " - Require the S3Guard mode to be \"non-authoritative\"\n" + + " -" + MAGIC_FLAG + " - Require the S3 filesystem to be support the \"magic\" committer\n" + " -" + ENCRYPTION_FLAG + " -require {none, sse-s3, sse-kms} - Require encryption policy"; BucketInfo(Configuration conf) { - super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG); + super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG, MAGIC_FLAG); CommandFormat format = getCommandFormat(); format.addOptionWithValue(ENCRYPTION_FLAG); } @@ -1014,6 +1020,11 @@ public abstract class S3GuardTool extends Configured implements Tool { } else { println(out, "Filesystem %s is not using S3Guard", fsUri); } + boolean magic = fs.hasCapability( + CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER); + println(out, "The \"magic\" committer %s supported", + magic ? "is" : "is not"); + println(out, "%nS3A Client"); String endpoint = conf.getTrimmed(ENDPOINT, ""); @@ -1043,6 +1054,9 @@ public abstract class S3GuardTool extends Configured implements Tool { throw badState("S3Guard is not enabled for %s", fsUri); } } + if (commands.getOpt(MAGIC_FLAG) && !magic) { + throw badState("The magic committer is not enabled for %s", fsUri); + } String desiredEncryption = getCommandFormat() .getOptValue(ENCRYPTION_FLAG); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org