http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java new file mode 100644 index 0000000..13e1c61 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test the committer factory logic, looking at the override + * and fallback behavior. + */ +@SuppressWarnings("unchecked") +public class TestPathOutputCommitterFactory extends Assert { + + private static final String HTTP_COMMITTER_FACTORY = String.format( + COMMITTER_FACTORY_SCHEME_PATTERN, "http"); + + private static final Path HTTP_PATH = new Path("http://hadoop.apache.org/"); + private static final Path HDFS_PATH = new Path("hdfs://localhost:8081/"); + + private TaskAttemptID taskAttemptID = + new TaskAttemptID("local", 0, TaskType.MAP, 1, 2); + + /** + * Set a factory for a schema, verify it works. + * @throws Throwable failure + */ + @Test + public void testCommitterFactoryForSchema() throws Throwable { + createCommitterFactory(SimpleCommitterFactory.class, + HTTP_PATH, + newBondedConfiguration()); + } + + /** + * A schema factory only affects that filesystem. + * @throws Throwable failure + */ + @Test + public void testCommitterFactoryFallbackDefault() throws Throwable { + createCommitterFactory(FileOutputCommitterFactory.class, + HDFS_PATH, + newBondedConfiguration()); + } + + /** + * A schema factory only affects that filesystem; test through + * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}. + * @throws Throwable failure + */ + @Test + public void testCommitterFallbackDefault() throws Throwable { + createCommitter(FileOutputCommitter.class, + HDFS_PATH, + taskAttempt(newBondedConfiguration())); + } + + /** + * Verify that you can override any schema with an explicit name. + */ + @Test + public void testCommitterFactoryOverride() throws Throwable { + Configuration conf = newBondedConfiguration(); + // set up for the schema factory + // and then set a global one which overrides the others. + conf.set(COMMITTER_FACTORY_CLASS, OtherFactory.class.getName()); + createCommitterFactory(OtherFactory.class, HDFS_PATH, conf); + createCommitterFactory(OtherFactory.class, HTTP_PATH, conf); + } + + /** + * Verify that if the factory class option is "", schema factory + * resolution still works. + */ + @Test + public void testCommitterFactoryEmptyOption() throws Throwable { + Configuration conf = newBondedConfiguration(); + // set up for the schema factory + // and then set a global one which overrides the others. + conf.set(COMMITTER_FACTORY_CLASS, ""); + createCommitterFactory(SimpleCommitterFactory.class, HTTP_PATH, conf); + + // and HDFS, with no schema, falls back to the default + createCommitterFactory(FileOutputCommitterFactory.class, HDFS_PATH, conf); + } + + /** + * Verify that if the committer factory class is unknown, you cannot + * create committers. + */ + @Test + public void testCommitterFactoryUnknown() throws Throwable { + Configuration conf = new Configuration(); + // set the factory to an unknown class + conf.set(COMMITTER_FACTORY_CLASS, "unknown"); + intercept(RuntimeException.class, + () -> getCommitterFactory(HDFS_PATH, conf)); + } + + /** + * Verify that if the committer output path is null, you get back + * a FileOutputCommitter with null output & work paths. + */ + @Test + public void testCommitterNullOutputPath() throws Throwable { + // bind http to schema + Configuration conf = newBondedConfiguration(); + // then ask committers for a null path + FileOutputCommitter committer = createCommitter( + FileOutputCommitterFactory.class, + FileOutputCommitter.class, + null, conf); + assertNull(committer.getOutputPath()); + assertNull(committer.getWorkPath()); + } + + /** + * Verify that if you explicitly name a committer, that takes priority + * over any filesystem committer. + */ + @Test + public void testNamedCommitterFactory() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + SimpleCommitter sc = createCommitter( + NamedCommitterFactory.class, + SimpleCommitter.class, HDFS_PATH, conf); + assertEquals("Wrong output path from " + sc, + HDFS_PATH, + sc.getOutputPath()); + } + + /** + * Verify that if you explicitly name a committer and there's no + * path, the committer is picked up. + */ + @Test + public void testNamedCommitterFactoryNullPath() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + SimpleCommitter sc = createCommitter( + NamedCommitterFactory.class, + SimpleCommitter.class, + null, conf); + assertNull(sc.getOutputPath()); + } + + /** + * Verify that if you explicitly name a committer and there's no + * path, the committer is picked up. + */ + @Test + public void testNamedCommitterNullPath() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + + SimpleCommitter sc = createCommitter( + SimpleCommitter.class, + null, taskAttempt(conf)); + assertNull(sc.getOutputPath()); + } + + /** + * Create a factory then a committer, validating the type of both. + * @param <T> type of factory + * @param <U> type of committer + * @param factoryClass expected factory class + * @param committerClass expected committer class + * @param path output path (may be null) + * @param conf configuration + * @return the committer + * @throws IOException failure to create + */ + private <T extends PathOutputCommitterFactory, U extends PathOutputCommitter> + U createCommitter(Class<T> factoryClass, + Class<U> committerClass, + Path path, + Configuration conf) throws IOException { + T f = createCommitterFactory(factoryClass, path, conf); + PathOutputCommitter committer = f.createOutputCommitter(path, + taskAttempt(conf)); + assertEquals(" Wrong committer for path " + path + " from factory " + f, + committerClass, committer.getClass()); + return (U) committer; + } + + /** + * Create a committer from a task context, via + * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}. + * @param <U> type of committer + * @param committerClass expected committer class + * @param path output path (may be null) + * @param context task attempt context + * @return the committer + * @throws IOException failure to create + */ + private <U extends PathOutputCommitter> U createCommitter( + Class<U> committerClass, + Path path, + TaskAttemptContext context) throws IOException { + PathOutputCommitter committer = PathOutputCommitterFactory + .createCommitter(path, context); + assertEquals(" Wrong committer for path " + path, + committerClass, committer.getClass()); + return (U) committer; + } + + /** + * Create a factory then a committer, validating its type. + * @param factoryClass expected factory class + * @param path output path (may be null) + * @param conf configuration + * @param <T> type of factory + * @return the factory + */ + private <T extends PathOutputCommitterFactory> T createCommitterFactory( + Class<T> factoryClass, + Path path, + Configuration conf) { + PathOutputCommitterFactory factory = getCommitterFactory(path, conf); + assertEquals(" Wrong factory for path " + path, + factoryClass, factory.getClass()); + return (T)factory; + } + + /** + * Create a new task attempt context. + * @param conf config + * @return a new context + */ + private TaskAttemptContext taskAttempt(Configuration conf) { + return new TaskAttemptContextImpl(conf, taskAttemptID); + } + + /** + * Verify that if you explicitly name a committer, that takes priority + * over any filesystem committer. + */ + @Test + public void testFileOutputCommitterFactory() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, FILE_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + getCommitterFactory(HDFS_PATH, conf); + createCommitter( + FileOutputCommitterFactory.class, + FileOutputCommitter.class, null, conf); + } + + /** + * Follow the entire committer chain down and create a new committer from + * the output format. + * @throws Throwable on a failure. + */ + @Test + public void testFileOutputFormatBinding() throws Throwable { + Configuration conf = newBondedConfiguration(); + conf.set(FileOutputFormat.OUTDIR, HTTP_PATH.toUri().toString()); + TextOutputFormat<String, String> off = new TextOutputFormat<>(); + SimpleCommitter committer = (SimpleCommitter) + off.getOutputCommitter(taskAttempt(conf)); + assertEquals("Wrong output path from "+ committer, + HTTP_PATH, + committer.getOutputPath()); + } + + /** + * Follow the entire committer chain down and create a new committer from + * the output format. + * @throws Throwable on a failure. + */ + @Test + public void testFileOutputFormatBindingNoPath() throws Throwable { + Configuration conf = new Configuration(); + conf.unset(FileOutputFormat.OUTDIR); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + httpToSimpleFactory(conf); + TextOutputFormat<String, String> off = new TextOutputFormat<>(); + SimpleCommitter committer = (SimpleCommitter) + off.getOutputCommitter(taskAttempt(conf)); + assertNull("Output path from "+ committer, + committer.getOutputPath()); + } + + /** + * Bind the http schema CommitterFactory to {@link SimpleCommitterFactory}. + * @param conf config to patch + */ + private Configuration httpToSimpleFactory(Configuration conf) { + conf.set(HTTP_COMMITTER_FACTORY, SimpleCommitterFactory.class.getName()); + return conf; + } + + + /** + * Create a configuration with the http schema bonded to the simple factory. + * @return a new, patched configuration + */ + private Configuration newBondedConfiguration() { + return httpToSimpleFactory(new Configuration()); + } + + /** + * Extract the (mandatory) cause of an exception. + * @param ex exception + * @param clazz expected class + * @return the cause, which will be of the expected type + * @throws AssertionError if there is a problem + */ + private <E extends Throwable> E verifyCauseClass(Throwable ex, + Class<E> clazz) throws AssertionError { + Throwable cause = ex.getCause(); + if (cause == null) { + throw new AssertionError("No cause", ex); + } + if (!cause.getClass().equals(clazz)) { + throw new AssertionError("Wrong cause class", cause); + } + return (E)cause; + } + + @Test + public void testBadCommitterFactory() throws Throwable { + expectFactoryConstructionFailure(HTTP_COMMITTER_FACTORY); + } + + @Test + public void testBoundCommitterWithSchema() throws Throwable { + // this verifies that a bound committer relays to the underlying committer + Configuration conf = newBondedConfiguration(); + TestPathOutputCommitter.TaskContext tac + = new TestPathOutputCommitter.TaskContext(conf); + BindingPathOutputCommitter committer + = new BindingPathOutputCommitter(HTTP_PATH, tac); + intercept(IOException.class, "setupJob", + () -> committer.setupJob(tac)); + } + + @Test + public void testBoundCommitterWithDefault() throws Throwable { + // this verifies that a bound committer relays to the underlying committer + Configuration conf = newBondedConfiguration(); + TestPathOutputCommitter.TaskContext tac + = new TestPathOutputCommitter.TaskContext(conf); + BindingPathOutputCommitter committer + = new BindingPathOutputCommitter(HDFS_PATH, tac); + assertEquals(FileOutputCommitter.class, + committer.getCommitter().getClass()); + } + + /** + * Set the specific key to a string which is not a factory class; expect + * a failure. + * @param key key to set + * @throws Throwable on a failure + */ + @SuppressWarnings("ThrowableNotThrown") + protected void expectFactoryConstructionFailure(String key) throws Throwable { + Configuration conf = new Configuration(); + conf.set(key, "Not a factory"); + RuntimeException ex = intercept(RuntimeException.class, + () -> getCommitterFactory(HTTP_PATH, conf)); + verifyCauseClass( + verifyCauseClass(ex, RuntimeException.class), + ClassNotFoundException.class); + } + + /** + * A simple committer. + */ + public static final class SimpleCommitter extends PathOutputCommitter { + + private final Path outputPath; + + public SimpleCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.outputPath = outputPath; + } + + @Override + public Path getWorkPath() throws IOException { + return null; + } + + /** + * Job setup throws an exception. + * @param jobContext Context of the job + * @throws IOException always + */ + @Override + public void setupJob(JobContext jobContext) throws IOException { + throw new IOException("setupJob"); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public Path getOutputPath() { + return outputPath; + } + } + + /** + * The simple committer factory. + */ + private static class SimpleCommitterFactory + extends PathOutputCommitterFactory { + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new SimpleCommitter(outputPath, context); + } + + } + + /** + * Some other factory. + */ + private static class OtherFactory extends PathOutputCommitterFactory { + + /** + * {@inheritDoc} + * @param outputPath output path. This may be null. + * @param context context + * @return + * @throws IOException + */ + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new SimpleCommitter(outputPath, context); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 97ceddf..748537c 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -129,7 +129,9 @@ <!-- surefire.forkNumber won't do the parameter --> <!-- substitution. Putting a prefix in front of it like --> <!-- "fork-" makes it work. --> - <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id> + <!-- Important: Those leading 0s are needed to guarantee that --> + <!-- trailing three chars are always numeric and unique --> + <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id> <!-- Propagate scale parameters --> <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> @@ -165,7 +167,7 @@ <!-- surefire.forkNumber won't do the parameter --> <!-- substitution. Putting a prefix in front of it like --> <!-- "fork-" makes it work. --> - <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id> + <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id> <!-- Propagate scale parameters --> <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> @@ -192,7 +194,6 @@ <include>**/ITest*.java</include> </includes> <excludes> - <exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude> <exclude>**/ITestS3AContractRootDir.java</exclude> <exclude>**/ITestS3AFileContextStatistics.java</exclude> <exclude>**/ITestS3AEncryptionSSEC*.java</exclude> @@ -225,7 +226,6 @@ <!-- Do a sequential run for tests that cannot handle --> <!-- parallel execution. --> <includes> - <include>**/ITestJets3tNativeS3FileSystemContract.java</include> <include>**/ITestS3AContractRootDir.java</include> <include>**/ITestS3AFileContextStatistics.java</include> <include>**/ITestS3AHuge*.java</include> @@ -465,8 +465,8 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <scope>test</scope> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -476,12 +476,23 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-hs</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <scope>test</scope> <type>jar</type> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-distcp</artifactId> <scope>test</scope> </dependency> @@ -491,5 +502,28 @@ <scope>test</scope> <type>test-jar</type> </dependency> + <!-- artifacts needed to bring up a Mini MR Yarn cluster--> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java new file mode 100644 index 0000000..482c5a1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; + +/** + * A 400 "Bad Request" exception was received. + * This is the general "bad parameters, headers, whatever" failure. + */ +public class AWSBadRequestException extends AWSServiceIOException { + /** + * HTTP status code which signals this failure mode was triggered: {@value}. + */ + public static final int STATUS_CODE = 400; + + /** + * Instantiate. + * @param operation operation which triggered this + * @param cause the underlying cause + */ + public AWSBadRequestException(String operation, + AmazonServiceException cause) { + super(operation, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java index a8c01cb..22afb01 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; +import com.amazonaws.SdkBaseException; import com.google.common.base.Preconditions; import java.io.IOException; @@ -31,7 +32,7 @@ public class AWSClientIOException extends IOException { private final String operation; public AWSClientIOException(String operation, - AmazonClientException cause) { + SdkBaseException cause) { super(cause); Preconditions.checkArgument(operation != null, "Null 'operation' argument"); Preconditions.checkArgument(cause != null, "Null 'cause' argument"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java new file mode 100644 index 0000000..e6a23b2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; + +/** + * Status code 443, no response from server. This is considered idempotent. + */ +public class AWSNoResponseException extends AWSServiceIOException { + public AWSNoResponseException(String operation, + AmazonServiceException cause) { + super(operation, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java new file mode 100644 index 0000000..bb337ee --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; + +/** + * Request is redirected. + * If this gets as far as the user, it's unrecoverable + */ +public class AWSRedirectException extends AWSServiceIOException { + + /** + * Instantiate. + * @param operation operation which triggered this + * @param cause the underlying cause + */ + public AWSRedirectException(String operation, + AmazonServiceException cause) { + super(operation, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java new file mode 100644 index 0000000..131cea7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; + +/** + * Exception raised when a service was throttled. + */ +public class AWSServiceThrottledException extends AWSServiceIOException { + + /** + * HTTP status code which signals this failure mode was triggered: {@value}. + */ + public static final int STATUS_CODE = 503; + + /** + * Instantiate. + * @param operation operation which triggered this + * @param cause the underlying cause + */ + public AWSServiceThrottledException(String operation, + AmazonServiceException cause) { + super(operation, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java new file mode 100644 index 0000000..83be294 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; + +/** + * A 500 response came back from a service. + * This is considered <i>probably</i> retriable, That is, we assume + * <ol> + * <li>whatever error happened in the service itself to have happened + * before the infrastructure committed the operation.</li> + * <li>Nothing else got through either.</li> + * </ol> + */ +public class AWSStatus500Exception extends AWSServiceIOException { + public AWSStatus500Exception(String operation, + AmazonServiceException cause) { + super(operation, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index 5b25730..f13942d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -45,7 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience; final class BlockingThreadPoolExecutorService extends SemaphoredDelegatingExecutor { - private static Logger LOG = LoggerFactory + private static final Logger LOG = LoggerFactory .getLogger(BlockingThreadPoolExecutorService.class); private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index d278bdf..e6b2bdb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -35,6 +35,11 @@ public final class Constants { private Constants() { } + /** + * default hadoop temp dir on local system: {@value}. + */ + public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; + /** The minimum multipart size which S3 supports. */ public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024; @@ -328,14 +333,6 @@ public final class Constants { @InterfaceAudience.Private public static final int MAX_MULTIPART_COUNT = 10000; - /** - * Classname of the S3A-specific output committer factory. This - * is what must be declared when attempting to use - */ - @InterfaceStability.Unstable - public static final String S3A_OUTPUT_COMMITTER_FACTORY = - "org.apache.hadoop.fs.s3a.commit.S3AOutputCommitterFactory"; - /* Constants. */ public static final String S3_METADATA_STORE_IMPL = "fs.s3a.metadatastore.impl"; @@ -411,13 +408,6 @@ public final class Constants { public static final int S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT = 25; /** - * V1 committer. - */ - @InterfaceStability.Unstable - public static final String S3A_OUTPUT_COMMITTER_MRV1 = - "org.apache.hadoop.fs.s3a.commit.S3OutputCommitterMRv1"; - - /** * The default "Null" metadata store: {@value}. */ @InterfaceStability.Unstable @@ -463,4 +453,56 @@ public final class Constants { @InterfaceStability.Unstable public static final int DEFAULT_LIST_VERSION = 2; + @InterfaceStability.Unstable + public static final String FAIL_INJECT_THROTTLE_PROBABILITY = + "fs.s3a.failinject.throttle.probability"; + + @InterfaceStability.Unstable + public static final String FAIL_INJECT_CLIENT_FACTORY = + "org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory"; + + /** + * Number of times to retry any repeatable S3 client request on failure, + * excluding throttling requests: {@value}. + */ + public static final String RETRY_LIMIT = "fs.s3a.retry.limit"; + + /** + * Default retry limit: {@value}. + */ + public static final int RETRY_LIMIT_DEFAULT = DEFAULT_MAX_ERROR_RETRIES; + + /** + * Interval between retry attempts.: {@value}. + */ + public static final String RETRY_INTERVAL = "fs.s3a.retry.interval"; + + /** + * Default retry interval: {@value}. + */ + public static final String RETRY_INTERVAL_DEFAULT = "500ms"; + + /** + * Number of times to retry any throttled request: {@value}. + */ + public static final String RETRY_THROTTLE_LIMIT = + "fs.s3a.retry.throttle.limit"; + + /** + * Default throttled retry limit: {@value}. + */ + public static final int RETRY_THROTTLE_LIMIT_DEFAULT = + DEFAULT_MAX_ERROR_RETRIES; + + /** + * Interval between retry attempts on throttled requests: {@value}. + */ + public static final String RETRY_THROTTLE_INTERVAL = + "fs.s3a.retry.throttle.interval"; + + /** + * Default throttled retry interval: {@value}. + */ + public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "500ms"; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java index 6476f5d..d158061 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -18,38 +18,50 @@ package org.apache.hadoop.fs.s3a; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; +import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.hadoop.fs.s3a.Constants.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - /** * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects * inconsistency and/or errors. Used for testing S3Guard. @@ -88,6 +100,21 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { private long delayKeyMsec; /** + * Probability of throttling a request. + */ + private float throttleProbability; + + /** + * Counter of failures since last reset. + */ + private final AtomicLong failureCounter = new AtomicLong(0); + + /** + * limit for failures before operations succeed; if 0 then "no limit". + */ + private int failureLimit = 0; + + /** * Composite of data we need to track about recently deleted objects: * when it was deleted (same was with recently put objects) and the object * summary (since we should keep returning it for sometime after its @@ -134,12 +161,25 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { if (delayKeySubstring.equals(MATCH_ALL_KEYS)) { delayKeySubstring = ""; } - delayKeyProbability = conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, - DEFAULT_DELAY_KEY_PROBABILITY); + delayKeyProbability = validProbability( + conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, + DEFAULT_DELAY_KEY_PROBABILITY)); delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC); - LOG.info("Enabled with {} msec delay, substring {}, probability {}", - delayKeyMsec, delayKeySubstring, delayKeyProbability); + setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY, + 0.0f)); + LOG.info("{}", this); + } + + @Override + public String toString() { + return String.format( + "Inconsistent S3 Client with" + + " %s msec delay, substring %s, delay probability %s;" + + " throttle probability %s" + + "; failure limit %d, failure count %d", + delayKeyMsec, delayKeySubstring, delayKeyProbability, + throttleProbability, failureLimit, failureCounter.get()); } /** @@ -174,10 +214,11 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException, AmazonServiceException { + maybeFail(); for (DeleteObjectsRequest.KeyVersion keyVersion : deleteObjectsRequest.getKeys()) { - registerDeleteObject(keyVersion.getKey(), deleteObjectsRequest - .getBucketName()); + registerDeleteObject(keyVersion.getKey(), + deleteObjectsRequest.getBucketName()); } return super.deleteObjects(deleteObjectsRequest); } @@ -187,6 +228,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { throws AmazonClientException, AmazonServiceException { String key = deleteObjectRequest.getKey(); LOG.debug("key {}", key); + maybeFail(); registerDeleteObject(key, deleteObjectRequest.getBucketName()); super.deleteObject(deleteObjectRequest); } @@ -196,6 +238,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException { LOG.debug("key {}", putObjectRequest.getKey()); + maybeFail(); registerPutObject(putObjectRequest); return super.putObject(putObjectRequest); } @@ -204,6 +247,20 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { @Override public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException { + maybeFail(); + return innerlistObjects(listObjectsRequest); + } + + /** + * Run the list object call without any failure probability. + * This stops a very aggressive failure rate from completely overloading + * the retry logic. + * @param listObjectsRequest request + * @return listing + * @throws AmazonClientException failure + */ + private ObjectListing innerlistObjects(ListObjectsRequest listObjectsRequest) + throws AmazonClientException, AmazonServiceException { LOG.debug("prefix {}", listObjectsRequest.getPrefix()); ObjectListing listing = super.listObjects(listObjectsRequest); listing = filterListObjects(listing); @@ -215,6 +272,16 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { @Override public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request) throws AmazonClientException, AmazonServiceException { + maybeFail(); + return innerListObjectsV2(request); + } + + /** + * Non failing V2 list object request. + * @param request request + * @return result. + */ + private ListObjectsV2Result innerListObjectsV2(ListObjectsV2Request request) { LOG.debug("prefix {}", request.getPrefix()); ListObjectsV2Result listing = super.listObjectsV2(request); listing = filterListObjectsV2(listing); @@ -222,17 +289,13 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { return listing; } - private void addSummaryIfNotPresent(List<S3ObjectSummary> list, S3ObjectSummary item) { // Behavior of S3ObjectSummary String key = item.getKey(); - for (S3ObjectSummary member : list) { - if (member.getKey().equals(key)) { - return; - } + if (list.stream().noneMatch((member) -> member.getKey().equals(key))) { + list.add(item); } - list.add(item); } /** @@ -396,13 +459,9 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { } private List<String> filterPrefixes(List<String> prefixes) { - List<String> outputPrefixes = new ArrayList<>(); - for (String key : prefixes) { - if (!isKeyDelayed(delayedPutKeys.get(key), key)) { - outputPrefixes.add(key); - } - } - return outputPrefixes; + return prefixes.stream() + .filter(key -> !isKeyDelayed(delayedPutKeys.get(key), key)) + .collect(Collectors.toList()); } private boolean isKeyDelayed(Long enqueueTime, String key) { @@ -425,14 +484,14 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { private void registerDeleteObject(String key, String bucket) { if (shouldDelay(key)) { // Record summary so we can add it back for some time post-deletion - S3ObjectSummary summary = null; - ObjectListing list = listObjects(bucket, key); - for (S3ObjectSummary result : list.getObjectSummaries()) { - if (result.getKey().equals(key)) { - summary = result; - break; - } - } + ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(bucket) + .withPrefix(key); + S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries() + .stream() + .filter(result -> result.getKey().equals(key)) + .findFirst() + .orElse(null); delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary)); } } @@ -471,7 +530,109 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { delayedPutKeys.put(key, System.currentTimeMillis()); } + @Override + public CompleteMultipartUploadResult completeMultipartUpload( + CompleteMultipartUploadRequest completeMultipartUploadRequest) + throws SdkClientException, AmazonServiceException { + maybeFail(); + return super.completeMultipartUpload(completeMultipartUploadRequest); + } + + @Override + public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest) + throws SdkClientException, AmazonServiceException { + maybeFail(); + return super.uploadPart(uploadPartRequest); + } + + @Override + public InitiateMultipartUploadResult initiateMultipartUpload( + InitiateMultipartUploadRequest initiateMultipartUploadRequest) + throws SdkClientException, AmazonServiceException { + maybeFail(); + return super.initiateMultipartUpload(initiateMultipartUploadRequest); + } + + @Override + public MultipartUploadListing listMultipartUploads( + ListMultipartUploadsRequest listMultipartUploadsRequest) + throws SdkClientException, AmazonServiceException { + maybeFail(); + return super.listMultipartUploads(listMultipartUploadsRequest); + } + + public float getDelayKeyProbability() { + return delayKeyProbability; + } + + public long getDelayKeyMsec() { + return delayKeyMsec; + } + + /** + * Get the probability of the request being throttled. + * @return a value 0 - 1.0f. + */ + public float getThrottleProbability() { + return throttleProbability; + } + + /** + * Set the probability of throttling a request. + * @param throttleProbability the probability of a request being throttled. + */ + public void setThrottleProbability(float throttleProbability) { + this.throttleProbability = validProbability(throttleProbability); + } + + /** + * Validate a probability option. + * @param p probability + * @return the probability, if valid + * @throws IllegalArgumentException if the probability is out of range. + */ + private float validProbability(float p) { + Preconditions.checkArgument(p >= 0.0f && p <= 1.0f, + "Probability out of range 0 to 1 %s", p); + return p; + } + + /** + * Conditionally fail the operation. + * @throws AmazonClientException if the client chooses to fail + * the request. + */ + private void maybeFail() throws AmazonClientException { + // code structure here is to line up for more failures later + AmazonServiceException ex = null; + if (trueWithProbability(throttleProbability)) { + // throttle the request + ex = new AmazonServiceException("throttled" + + " count = " + (failureCounter.get() + 1), null); + ex.setStatusCode(503); + } + + if (ex != null) { + long count = failureCounter.incrementAndGet(); + if (failureLimit == 0 + || (failureLimit > 0 && count < failureLimit)) { + throw ex; + } + } + } + + /** + * Set the limit on failures before all operations pass through. + * This resets the failure count. + * @param limit limit; "0" means "no limit" + */ + public void setFailureLimit(int limit) { + this.failureLimit = limit; + failureCounter.set(0); + } + /** Since ObjectListing is immutable, we just override it with wrapper. */ + @SuppressWarnings("serial") private static class CustomObjectListing extends ObjectListing { private final List<S3ObjectSummary> customListing; @@ -506,6 +667,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { } } + @SuppressWarnings("serial") private static class CustomListObjectsV2Result extends ListObjectsV2Result { private final List<S3ObjectSummary> customListing; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java new file mode 100644 index 0000000..9900f4c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Optional; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.SdkBaseException; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.retry.RetryPolicy; + +/** + * Class to provide lambda expression invocation of AWS operations. + * + * The core retry logic is in + * {@link #retryUntranslated(String, boolean, Retried, Operation)}; + * the other {@code retry() and retryUntranslated()} calls are wrappers. + * + * The static {@link #once(String, String, Operation)} and + * {@link #once(String, String, VoidOperation)} calls take an operation and + * return it with AWS exceptions translated to IOEs of some form. + * + * The retry logic on a failure is defined by the retry policy passed in + * the constructor; the standard retry policy is {@link S3ARetryPolicy}, + * though others may be used. + * + * The constructor also takes two {@link Retried} callbacks. + * The {@code caughtCallback} is called whenever an exception (IOE or AWS) + * is caught, before the retry processing looks at it. + * The {@code retryCallback} is invoked after a retry is scheduled + * but before the sleep. + * These callbacks can be used for reporting and incrementing statistics. + * + * The static {@link #quietly(String, String, VoidOperation)} and + * {@link #quietlyEval(String, String, Operation)} calls exist to take any + * operation and quietly catch & log at debug. The return value of + * {@link #quietlyEval(String, String, Operation)} is a java 8 optional, + * which can then be used in java8-expressions. + */ +public class Invoker { + private static final Logger LOG = LoggerFactory.getLogger(Invoker.class); + + /** + * Retry policy to use. + */ + private final RetryPolicy retryPolicy; + + /** + * Default retry handler. + */ + private final Retried retryCallback; + + /** + * Instantiate. + * @param retryPolicy retry policy for all operations. + * @param retryCallback standard retry policy + */ + public Invoker( + RetryPolicy retryPolicy, + Retried retryCallback) { + this.retryPolicy = retryPolicy; + this.retryCallback = retryCallback; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public Retried getRetryCallback() { + return retryCallback; + } + + /** + * Execute a function, translating any exception into an IOException. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param operation operation to execute + * @param <T> type of return value + * @return the result of the function call + * @throws IOException any IOE raised, or translated exception + */ + @Retries.OnceTranslated + public static <T> T once(String action, String path, Operation<T> operation) + throws IOException { + try { + return operation.execute(); + } catch (AmazonClientException e) { + throw S3AUtils.translateException(action, path, e); + } + } + + /** + * Execute an operation with no result. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param operation operation to execute + * @throws IOException any IOE raised, or translated exception + */ + @Retries.OnceTranslated + public static void once(String action, String path, VoidOperation operation) + throws IOException { + once(action, path, + () -> { + operation.execute(); + return null; + }); + } + + /** + * Execute an operation and ignore all raised IOExceptions; log at INFO. + * @param log log to log at info. + * @param action action to include in log + * @param path optional path to include in log + * @param operation operation to execute + * @param <T> type of operation + */ + public static <T> void ignoreIOExceptions( + Logger log, + String action, + String path, + Operation<T> operation) { + try { + once(action, path, operation); + } catch (IOException e) { + log.info("{}: {}", toDescription(action, path), e.toString(), e); + } + } + + /** + * Execute an operation and ignore all raised IOExceptions; log at INFO. + * @param log log to log at info. + * @param action action to include in log + * @param path optional path to include in log + * @param operation operation to execute + */ + public static void ignoreIOExceptions( + Logger log, + String action, + String path, + VoidOperation operation) { + ignoreIOExceptions(log, action, path, + () -> { + operation.execute(); + return null; + }); + } + + /** + * Execute a void operation with retry processing. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param retrying callback on retries + * @param operation operation to execute + * @throws IOException any IOE raised, or translated exception + */ + @Retries.RetryTranslated + public void retry(String action, + String path, + boolean idempotent, + Retried retrying, + VoidOperation operation) + throws IOException { + retry(action, path, idempotent, retrying, + () -> { + operation.execute(); + return null; + }); + } + + /** + * Execute a void operation with the default retry callback invoked. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param operation operation to execute + * @throws IOException any IOE raised, or translated exception + */ + @Retries.RetryTranslated + public void retry(String action, + String path, + boolean idempotent, + VoidOperation operation) + throws IOException { + retry(action, path, idempotent, retryCallback, operation); + } + + /** + * Execute a function with the default retry callback invoked. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param operation operation to execute + * @param <T> type of return value + * @return the result of the call + * @throws IOException any IOE raised, or translated exception + */ + @Retries.RetryTranslated + public <T> T retry(String action, + String path, + boolean idempotent, + Operation<T> operation) + throws IOException { + + return retry(action, path, idempotent, retryCallback, operation); + } + + /** + * Execute a function with retry processing. + * Uses {@link #once(String, String, Operation)} as the inner + * invocation mechanism before retry logic is performed. + * @param <T> type of return value + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param retrying callback on retries + * @param operation operation to execute + * @return the result of the call + * @throws IOException any IOE raised, or translated exception + */ + @Retries.RetryTranslated + public <T> T retry( + String action, + String path, + boolean idempotent, + Retried retrying, + Operation<T> operation) + throws IOException { + return retryUntranslated( + toDescription(action, path), + idempotent, + retrying, + () -> once(action, path, operation)); + } + + /** + * Execute a function with retry processing and no translation. + * and the default retry callback. + * @param text description for the catching callback + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param operation operation to execute + * @param <T> type of return value + * @return the result of the call + * @throws IOException any IOE raised + * @throws RuntimeException any Runtime exception raised + */ + @Retries.RetryRaw + public <T> T retryUntranslated( + String text, + boolean idempotent, + Operation<T> operation) throws IOException { + return retryUntranslated(text, idempotent, + retryCallback, operation); + } + + /** + * Execute a function with retry processing: AWS SDK Exceptions + * are <i>not</i> translated. + * This is method which the others eventually invoke. + * @param <T> type of return value + * @param text text to include in messages + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param retrying callback on retries + * @param operation operation to execute + * @return the result of the call + * @throws IOException any IOE raised + * @throws SdkBaseException any AWS exception raised + * @throws RuntimeException : these are never caught and retries. + */ + @Retries.RetryRaw + public <T> T retryUntranslated( + String text, + boolean idempotent, + Retried retrying, + Operation<T> operation) throws IOException { + + Preconditions.checkArgument(retrying != null, "null retrying argument"); + int retryCount = 0; + Exception caught; + RetryPolicy.RetryAction retryAction; + boolean shouldRetry; + do { + try { + // execute the operation, returning if successful + return operation.execute(); + } catch (IOException | SdkBaseException e) { + caught = e; + } + // you only get here if the operation didn't complete + // normally, hence caught != null + + // translate the exception into an IOE for the retry logic + IOException translated; + if (caught instanceof IOException) { + translated = (IOException) caught; + } else { + translated = S3AUtils.translateException(text, "", + (SdkBaseException)caught); + } + + + int attempts = retryCount + 1; + try { + // decide action base on operation, invocation count, etc + retryAction = retryPolicy.shouldRetry(translated, retryCount, 0, + idempotent); + // is it a retry operation? + shouldRetry = retryAction.action.equals( + RetryPolicy.RetryAction.RETRY.action); + if (shouldRetry) { + // notify the callback + retrying.onFailure(text, translated, retryCount, idempotent); + // then sleep for the policy delay + Thread.sleep(retryAction.delayMillis); + } + // increment the retry count + retryCount++; + } catch (InterruptedException e) { + // sleep was interrupted + // change the exception + caught = new InterruptedIOException("Interrupted"); + caught.initCause(e); + // no retry + shouldRetry = false; + // and re-interrupt the thread + Thread.currentThread().interrupt(); + } catch (Exception e) { + // The retry policy raised an exception + // log that something happened + LOG.warn("{}: exception in retry processing", text, e); + // and fail the execution with the last execution exception. + shouldRetry = false; + } + } while (shouldRetry); + + if (caught instanceof IOException) { + throw (IOException) caught; + } else { + throw (SdkBaseException) caught; + } + } + + + /** + * Execute an operation; any exception raised is simply caught and + * logged at debug. + * @param action action to execute + * @param path path (for exception construction) + * @param operation operation + */ + public static void quietly(String action, + String path, + VoidOperation operation) { + try { + once(action, path, operation); + } catch (Exception e) { + LOG.debug("Action {} failed", action, e); + } + } + + /** + * Execute an operation; any exception raised is caught and + * logged at debug. + * The result is only non-empty if the operation succeeded + * @param action action to execute + * @param path path (for exception construction) + * @param operation operation + */ + public static <T> Optional<T> quietlyEval(String action, + String path, + Operation<T> operation) { + try { + return Optional.of(once(action, path, operation)); + } catch (Exception e) { + LOG.debug("Action {} failed", action, e); + return Optional.empty(); + } + } + + /** + * Take an action and path and produce a string for logging. + * @param action action + * @param path path (may be null or empty) + * @return string for logs + */ + private static String toDescription(String action, String path) { + return action + + (StringUtils.isNotEmpty(path) ? (" on " + path) : ""); + } + + /** + * Arbitrary operation throwing an IOException. + * @param <T> return type + */ + @FunctionalInterface + public interface Operation<T> { + T execute() throws IOException; + } + + /** + * Void operation which may raise an IOException. + */ + @FunctionalInterface + public interface VoidOperation { + void execute() throws IOException; + } + + /** + * Callback for retry and notification operations. + * Even if the interface is throwing up "raw" exceptions, this handler + * gets the translated one. + */ + @FunctionalInterface + public interface Retried { + /** + * Retry event in progress (before any sleep). + * @param text text passed in to the retry() Call. + * @param exception the caught (and possibly translated) exception. + * @param retries number of retries so far + * @param idempotent is the request idempotent. + */ + void onFailure( + String text, + IOException exception, + int retries, + boolean idempotent); + } + + /** + * No op for a retrying callback. + */ + public static final Retried NO_OP = new Retried() { + @Override + public void onFailure(String text, + IOException exception, + int retries, + boolean idempotent) { + } + }; + + /** + * Log summary at info, full stack at debug. + */ + public static final Retried LOG_EVENT = new Retried() { + @Override + public void onFailure(String text, + IOException exception, + int retries, + boolean idempotent) { + LOG.debug("{}: " + exception, text); + if (retries == 1) { + // stack on first attempt, to keep noise down + LOG.debug("{}: " + exception, text, exception); + } + } + }; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index d9f059b..eb87705 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -385,8 +385,9 @@ public class Listing { status = statusBatchIterator.next(); // We remove from provided list the file status listed by S3 so that // this does not return duplicate items. - LOG.debug("Removing the status from provided file status {}", status); - providedStatus.remove(status); + if (providedStatus.remove(status)) { + LOG.debug("Removed the status from provided file status {}", status); + } } else { if (providedStatusIterator.hasNext()) { status = providedStatusIterator.next(); @@ -540,10 +541,11 @@ public class Listing { * initial set of results/fail if there was a problem talking to the bucket. * @param listPath path of the listing * @param request initial request to make - * */ + * @throws IOException if listObjects raises one. + */ ObjectListingIterator( Path listPath, - S3ListRequest request) { + S3ListRequest request) throws IOException { this.listPath = listPath; this.maxKeys = owner.getMaxKeys(); this.objects = owner.listObjects(request); @@ -571,6 +573,7 @@ public class Listing { * @throws NoSuchElementException if there is no more data to list. */ @Override + @Retries.RetryTranslated public S3ListResult next() throws IOException { if (firstListing) { // on the first listing, don't request more data. @@ -814,19 +817,4 @@ public class Listing { } } - /** - * A Path filter which accepts all filenames. - */ - static final PathFilter ACCEPT_ALL = new PathFilter() { - @Override - public boolean accept(Path file) { - return true; - } - - @Override - public String toString() { - return "ACCEPT_ALL"; - } - }; - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java new file mode 100644 index 0000000..80ecf0c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Declaration of retry policy for documentation only. + * This is purely for visibility in source and is currently package-scoped. + * Compare with {@link org.apache.hadoop.io.retry.AtMostOnce} + * and {@link org.apache.hadoop.io.retry.Idempotent}; these are real + * markers used by Hadoop RPC. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class Retries { + /** + * No retry, exceptions are translated. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface OnceTranslated { + String value() default ""; + } + + /** + * No retry, exceptions are not translated. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface OnceRaw { + String value() default ""; + } + + /** + * No retry, expect a bit of both. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface OnceMixed { + String value() default ""; + } + + /** + * Retried, exceptions are translated. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface RetryTranslated { + String value() default ""; + } + + /** + * Retried, no translation. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface RetryRaw { + String value() default ""; + } + + /** + * Retried, mixed translation. + */ + @Documented + @Retention(RetentionPolicy.SOURCE) + public @interface RetryMixed { + String value() default ""; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org