HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel patch includes HADOOP-12844 Recover when S3A fails on IOException in read() HADOOP-13058 S3A FS fails during init against a read-only FS if multipart purge HADOOP-13047 S3a Forward seek in stream length to be configurable
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27c4e90e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27c4e90e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27c4e90e Branch: refs/heads/HDFS-1312 Commit: 27c4e90efce04e1b1302f668b5eb22412e00d033 Parents: 6b53802 Author: Steve Loughran <ste...@apache.org> Authored: Thu May 12 19:24:20 2016 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Thu May 12 19:24:20 2016 +0100 ---------------------------------------------------------------------- .../org/apache/hadoop/fs/FSDataInputStream.java | 9 + .../hadoop/metrics2/MetricStringBuilder.java | 141 ++++++ .../hadoop/metrics2/lib/MutableCounterLong.java | 2 +- .../src/main/resources/core-default.xml | 10 +- .../hadoop-aws/dev-support/findbugs-exclude.xml | 357 +-------------- .../apache/hadoop/fs/s3/FileSystemStore.java | 4 +- .../org/apache/hadoop/fs/s3/S3Credentials.java | 2 + .../fs/s3a/AnonymousAWSCredentialsProvider.java | 4 + .../fs/s3a/BasicAWSCredentialsProvider.java | 4 + .../org/apache/hadoop/fs/s3a/Constants.java | 18 +- .../hadoop/fs/s3a/S3AFastOutputStream.java | 11 +- .../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 12 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 459 +++++++++++-------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 342 ++++++++++---- .../hadoop/fs/s3a/S3AInstrumentation.java | 457 ++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AOutputStream.java | 40 +- .../src/site/markdown/tools/hadoop-aws/index.md | 68 ++- .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 191 +++++++- .../fs/s3a/scale/TestS3ADeleteManyFiles.java | 1 - .../scale/TestS3AInputStreamPerformance.java | 285 ++++++++++++ .../src/test/resources/log4j.properties | 3 + 21 files changed, 1753 insertions(+), 667 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index da98769..640db59 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -234,4 +234,13 @@ public class FSDataInputStream extends DataInputStream "support unbuffering."); } } + + /** + * String value. Includes the string value of the inner stream + * @return the stream + */ + @Override + public String toString() { + return super.toString() + ": " + in; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java new file mode 100644 index 0000000..18a499a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Build a string dump of the metrics. + * + * The {@link #toString()} operator dumps out all values collected. + * + * Every entry is formatted as + * {@code prefix + name + separator + value + suffix} + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MetricStringBuilder extends MetricsRecordBuilder { + + private final StringBuilder builder = new StringBuilder(256); + + private final String prefix; + private final String suffix; + private final String separator; + private final MetricsCollector parent; + + /** + * Build an instance. + * @param parent parent collector. Unused in this instance; only used for + * the {@link #parent()} method + * @param prefix string before each entry + * @param separator separator between name and value + * @param suffix suffix after each entry + */ + public MetricStringBuilder(MetricsCollector parent, + String prefix, + String separator, + String suffix) { + this.parent = parent; + this.prefix = prefix; + this.suffix = suffix; + this.separator = separator; + } + + public MetricStringBuilder add(MetricsInfo info, Object value) { + return tuple(info.name(), value.toString()); + } + + /** + * Add any key,val pair to the string, between the prefix and suffix, + * separated by the separator. + * @param key key + * @param value value + * @return this instance + */ + public MetricStringBuilder tuple(String key, String value) { + builder.append(prefix) + .append(key) + .append(separator) + .append(value) + .append(suffix); + return this; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return tuple(tag.name(), tag.value()); + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + add(metric.info(), metric.toString()); + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return tuple("context", value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return add(info, value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + @Override + public String toString() { + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java index 03a6043..d3dec2e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java @@ -34,7 +34,7 @@ public class MutableCounterLong extends MutableCounter { private AtomicLong value = new AtomicLong(); - MutableCounterLong(MetricsInfo info, long initValue) { + public MutableCounterLong(MetricsInfo info, long initValue) { super(info); this.value.set(initValue); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index bd061c9..2b13133 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -938,7 +938,15 @@ uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description> </property> - <property> +<property> + <name>fs.s3a.readahead.range</name> + <value>65536</value> + <description>Bytes to read ahead during a seek() before closing and + re-opening the S3 HTTP connection. This option will be overridden if + any call to setReadahead() is made to an open stream.</description> +</property> + +<property> <name>fs.s3a.fast.buffer.size</name> <value>1048576</value> <description>Size of initial memory buffer in bytes allocated for an http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index 204e6ab..2b4160a 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -15,361 +15,8 @@ limitations under the License. --> <FindBugsFilter> - <Match> - <Package name="org.apache.hadoop.security.proto" /> - </Match> - <Match> - <Package name="org.apache.hadoop.tools.proto" /> - </Match> - <Match> - <Bug pattern="EI_EXPOSE_REP" /> - </Match> - <Match> - <Bug pattern="EI_EXPOSE_REP2" /> - </Match> - <Match> - <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> - </Match> - <Match> - <Class name="~.*_jsp" /> - <Bug pattern="DLS_DEAD_LOCAL_STORE" /> - </Match> - <Match> - <Field name="_jspx_dependants" /> - <Bug pattern="UWF_UNWRITTEN_FIELD" /> - </Match> - <!-- - Inconsistent synchronization for Client.Connection.out is - is intentional to make a connection to be closed instantly. - --> - <Match> - <Class name="org.apache.hadoop.ipc.Client$Connection" /> - <Field name="out" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> - <!-- - Further SaslException should be ignored during cleanup and - original exception should be re-thrown. - --> - <Match> - <Class name="org.apache.hadoop.security.SaslRpcClient" /> - <Bug pattern="DE_MIGHT_IGNORE" /> - </Match> - <!-- - Ignore Cross Scripting Vulnerabilities - --> - <Match> - <Package name="~org.apache.hadoop.mapred.*" /> - <Bug code="XSS" /> - </Match> - <Match> - <Class name="org.apache.hadoop.mapred.taskdetails_jsp" /> - <Bug code="HRS" /> - </Match> - <!-- - Ignore warnings where child class has the same name as - super class. Classes based on Old API shadow names from - new API. Should go off after HADOOP-1.0 - --> - <Match> - <Class name="~org.apache.hadoop.mapred.*" /> - <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" /> - </Match> - <!-- - Ignore warnings for usage of System.exit. This is - required and have been well thought out - --> - <Match> - <Class name="org.apache.hadoop.mapred.Child$2" /> - <Method name="run" /> - <Bug pattern="DM_EXIT" /> - </Match> - <Match> - <Class name="org.apache.hadoop.mapred.JobTracker" /> - <Method name="addHostToNodeMapping" /> - <Bug pattern="DM_EXIT" /> - </Match> - <Match> - <Class name="org.apache.hadoop.mapred.Task" /> - <Or> - <Method name="done" /> - <Method name="commit" /> - <Method name="statusUpdate" /> - </Or> - <Bug pattern="DM_EXIT" /> - </Match> - <Match> - <Class name="org.apache.hadoop.mapred.Task$TaskReporter" /> - <Method name="run" /> - <Bug pattern="DM_EXIT" /> - </Match> - <Match> - <Class name="org.apache.hadoop.util.ProgramDriver" /> - <Method name="driver" /> - <Bug pattern="DM_EXIT" /> - </Match> - <Match> - <Class name="org.apache.hadoop.util.RunJar" /> - <Method name="run" /> - <Bug pattern="DM_EXIT" /> - </Match> - <!-- - We need to cast objects between old and new api objects - --> - <Match> - <Class name="org.apache.hadoop.mapred.OutputCommitter" /> - <Bug pattern="BC_UNCONFIRMED_CAST" /> - </Match> - <!-- - We intentionally do the get name from the inner class - --> - <Match> - <Class name="org.apache.hadoop.mapred.TaskTracker$MapEventsFetcherThread" /> - <Method name="run" /> - <Bug pattern="IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD" /> - </Match> - <Match> - <Class name="org.apache.hadoop.mapred.FileOutputCommitter" /> - <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" /> - </Match> - <!-- - Ignoring this warning as resolving this would need a non-trivial change in code - --> - <Match> - <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor" /> - <Method name="configure" /> - <Field name="maxNumItems" /> - <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" /> - </Match> - <!-- - Comes from org.apache.jasper.runtime.ResourceInjector. Cannot do much. - --> - <Match> - <Class name="org.apache.hadoop.mapred.jobqueue_005fdetails_jsp" /> - <Field name="_jspx_resourceInjector" /> - <Bug pattern="SE_BAD_FIELD" /> - </Match> - <!-- - Storing textInputFormat and then passing it as a parameter. Safe to ignore. - --> - <Match> - <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob" /> - <Method name="createValueAggregatorJob" /> - <Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" /> - </Match> - <!-- - Can remove this after the upgrade to findbugs1.3.8 - --> - <Match> - <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat" /> - <Method name="getSplits" /> - <Bug pattern="DLS_DEAD_LOCAL_STORE" /> - </Match> - <!-- - This is a spurious warning. Just ignore - --> - <Match> - <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" /> - <Field name="kvindex" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> - - <!-- - core changes - --> - <Match> - <Class name="~org.apache.hadoop.*" /> - <Bug code="MS" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.fs.FileSystem" /> - <Method name="checkPath" /> - <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.io.Closeable" /> - <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.security.AccessControlException" /> - <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.util.ProcfsBasedProcessTree" /> - <Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" /> - </Match> - - <!-- - Streaming, Examples - --> - <Match> - <Class name="org.apache.hadoop.streaming.StreamUtil$TaskId" /> - <Bug pattern="URF_UNREAD_FIELD" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.examples.DBCountPageView" /> - <Method name="verify" /> - <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.examples.ContextFactory" /> - <Method name="setAttributes" /> - <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> - </Match> - - <!-- - TFile - --> - <Match> - <Class name="org.apache.hadoop.io.file.tfile.Chunk$ChunkDecoder" /> - <Method name="close" /> - <Bug pattern="SR_NOT_CHECKED" /> - </Match> - <!-- - The purpose of skip() is to drain remaining bytes of the chunk-encoded - stream (one chunk at a time). The termination condition is checked by - checkEOF(). - --> - <Match> - <Class name="org.apache.hadoop.io.file.tfile.Utils" /> - <Method name="writeVLong" /> - <Bug pattern="SF_SWITCH_FALLTHROUGH" /> - </Match> - <!-- - The switch condition fall through is intentional and for performance - purposes. - --> - - <Match> - <Class name="org.apache.hadoop.log.EventCounter"/> - <!-- backward compatibility --> - <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/> - </Match> - <Match> - <Class name="org.apache.hadoop.metrics.jvm.EventCounter"/> - <!-- backward compatibility --> - <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.security\.proto\.SecurityProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/> - </Match> - <Match> - <!-- protobuf generated code --> - <Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/> - </Match> - - <!-- - Manually checked, misses child thread manually syncing on parent's intrinsic lock. - --> - <Match> - <Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" /> - <Field name="previousSnapshot" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> - <!-- - The method uses a generic type T that extends two other types - T1 and T2. Findbugs complains of a cast from T1 to T2. - --> - <Match> - <Class name="org.apache.hadoop.fs.DelegationTokenRenewer" /> - <Method name="removeRenewAction" /> - <Bug pattern="BC_UNCONFIRMED_CAST" /> - </Match> - - <!-- Inconsistent synchronization flagged by findbugs is not valid. --> - <Match> - <Class name="org.apache.hadoop.ipc.Client$Connection" /> - <Field name="in" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> - <!-- - The switch condition for INITIATE is expected to fallthru to RESPONSE - to process initial sasl response token included in the INITIATE - --> - <Match> - <Class name="org.apache.hadoop.ipc.Server$Connection" /> - <Method name="processSaslMessage" /> - <Bug pattern="SF_SWITCH_FALLTHROUGH" /> - </Match> - - <!-- Synchronization performed on util.concurrent instance. --> - <Match> - <Class name="org.apache.hadoop.service.AbstractService" /> - <Method name="stop" /> - <Bug code="JLM" /> - </Match> - - <Match> - <Class name="org.apache.hadoop.service.AbstractService" /> - <Method name="waitForServiceToStop" /> - <Bug code="JLM" /> - </Match> - - <!-- - OpenStack Swift FS module -closes streams in a different method - from where they are opened. - --> - <Match> - <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/> - <Method name="uploadFileAttempt"/> - <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/> - </Match> - <Match> - <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/> - <Method name="uploadFilePartAttempt"/> - <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/> - </Match> - - <!-- code from maven source, null value is checked at callee side. --> - <Match> - <Class name="org.apache.hadoop.util.ComparableVersion$ListItem" /> - <Method name="compareTo" /> - <Bug code="NP" /> - </Match> - + <!-- S3n warnings about malicious code aren't that relevant given its limited future. --> <Match> - <Class name="org.apache.hadoop.util.HttpExceptionUtils"/> - <Method name="validateResponse"/> - <Bug pattern="REC_CATCH_EXCEPTION"/> + <Class name="org.apache.hadoop.fs.s3.INode" /> </Match> - </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java index 07e456b..3c7ed60 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java @@ -55,13 +55,13 @@ public interface FileSystemStore { /** * Delete everything. Used for testing. - * @throws IOException + * @throws IOException on any problem */ void purge() throws IOException; /** * Diagnostic method to dump all INodes to the console. - * @throws IOException + * @throws IOException on any problem */ void dump() throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java index fdacc3f..5ab352a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java @@ -38,6 +38,8 @@ public class S3Credentials { private String secretAccessKey; /** + * @param uri bucket URI optionally containing username and password. + * @param conf configuration * @throws IllegalArgumentException if credentials for S3 cannot be * determined. * @throws IOException if credential providers are misconfigured and we have http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java index 2a24273..e62ec77 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java @@ -21,7 +21,11 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AnonymousAWSCredentials; import com.amazonaws.auth.AWSCredentials; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +@InterfaceAudience.Private +@InterfaceStability.Stable public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider { public AWSCredentials getCredentials() { return new AnonymousAWSCredentials(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java index 9a0adda..2f721e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java @@ -23,7 +23,11 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.AWSCredentials; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +@InterfaceAudience.Private +@InterfaceStability.Stable public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { private final String accessKey; private final String secretKey; http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index bf3f85f..a800082 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -18,7 +18,19 @@ package org.apache.hadoop.fs.s3a; -public class Constants { +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * All the constants used with the {@link S3AFileSystem}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class Constants { + + private Constants() { + } + // s3 access key public static final String ACCESS_KEY = "fs.s3a.access.key"; @@ -124,4 +136,8 @@ public class Constants { public static final int S3A_DEFAULT_PORT = -1; public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix"; + + /** read ahead buffer size to prevent connection re-establishments. */ + public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; + public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 5558693..bf0c7b9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.Progressable; @@ -64,6 +65,7 @@ import java.util.concurrent.ExecutorService; * <p> * Unstable: statistics and error handling might evolve */ +@InterfaceAudience.Private @InterfaceStability.Unstable public class S3AFastOutputStream extends OutputStream { @@ -102,7 +104,8 @@ public class S3AFastOutputStream extends OutputStream { * @param partSize size of a single part in a multi-part upload (except * last part) * @param multiPartThreshold files at least this size use multi-part upload - * @throws IOException + * @param threadPoolExecutor thread factory + * @throws IOException on any problem */ public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, String bucket, String key, Progressable progress, @@ -159,7 +162,7 @@ public class S3AFastOutputStream extends OutputStream { * Writes a byte to the memory buffer. If this causes the buffer to reach * its limit, the actual upload is submitted to the threadpool. * @param b the int of which the lowest byte is written - * @throws IOException + * @throws IOException on any problem */ @Override public synchronized void write(int b) throws IOException { @@ -177,10 +180,10 @@ public class S3AFastOutputStream extends OutputStream { * @param b byte array containing * @param off offset in array where to start * @param len number of bytes to be written - * @throws IOException + * @throws IOException on any problem */ @Override - public synchronized void write(byte b[], int off, int len) + public synchronized void write(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index 47caea8..9ecca33 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -17,9 +17,19 @@ */ package org.apache.hadoop.fs.s3a; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +/** + * File status for an S3A "file". + * Modification time is trouble, see {@link #getModificationTime()}. + * + * The subclass is private as it should not be created directly. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AFileStatus extends FileStatus { private boolean isEmptyDirectory; @@ -45,7 +55,7 @@ public class S3AFileStatus extends FileStatus { return System.getProperty("user.name"); } - /** Compare if this object is equal to another object + /** Compare if this object is equal to another object. * @param o the object to be compared. * @return true if two file status has the same path name; false if not. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 287474c..3f9723d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; import java.util.Date; @@ -56,8 +57,11 @@ import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -76,9 +80,24 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The core S3A Filesystem implementation. + * + * This subclass is marked as private as code should not be creating it + * directly; use {@link FileSystem#get(Configuration)} and variants to + * create one. + * + * If cast to {@code S3AFileSystem}, extra methods and features may be accessed. + * Consider those private and unstable. + * + * Because it prints some of the state of the instrumentation, + * the output of {@link #toString()} must also be considered unstable. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AFileSystem extends FileSystem { /** - * Default blocksize as used in blocksize and FS status queries + * Default blocksize as used in blocksize and FS status queries. */ public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; private URI uri; @@ -94,6 +113,8 @@ public class S3AFileSystem extends FileSystem { public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; + private S3AInstrumentation instrumentation; + private long readAhead; // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; @@ -105,10 +126,12 @@ public class S3AFileSystem extends FileSystem { */ public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); + setConf(conf); + instrumentation = new S3AInstrumentation(name); uri = URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, - this.getWorkingDirectory()); + workingDir = new Path("/user", System.getProperty("user.name")) + .makeQualified(this.uri, this.getWorkingDirectory()); AWSAccessKeys creds = getAWSAccessKeys(name, conf); @@ -122,19 +145,20 @@ public class S3AFileSystem extends FileSystem { bucket = name.getHost(); ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS)); + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES)); - awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT)); - awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT)); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if(!signerOverride.isEmpty()) { + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); awsConf.setSignerOverride(signerOverride); } @@ -144,21 +168,23 @@ public class S3AFileSystem extends FileSystem { initAmazonS3Client(conf, credentials, awsConf); - maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - if (partSize < 5 * 1024 * 1024) { LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); partSize = 5 * 1024 * 1024; } + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); if (multiPartThreshold < 5 * 1024 * 1024) { LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); multiPartThreshold = 5 * 1024 * 1024; } + //check but do not store the block size + longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); + enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); + readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); if (maxThreads < 2) { @@ -180,19 +206,17 @@ public class S3AFileSystem extends FileSystem { initCannedAcls(conf); if (!s3.doesBucketExist(bucket)) { - throw new IOException("Bucket " + bucket + " does not exist"); + throw new FileNotFoundException("Bucket " + bucket + " does not exist"); } initMultipartUploads(conf); serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); - setConf(conf); } void initProxySupport(Configuration conf, ClientConfiguration awsConf, - boolean secureConnections) throws IllegalArgumentException, - IllegalArgumentException { + boolean secureConnections) throws IllegalArgumentException { String proxyHost = conf.getTrimmed(PROXY_HOST, ""); int proxyPort = conf.getInt(PROXY_PORT, -1); if (!proxyHost.isEmpty()) { @@ -223,7 +247,8 @@ public class S3AFileSystem extends FileSystem { if (LOG.isDebugEnabled()) { LOG.debug("Using proxy server {}:{} as user {} with password {} on " + "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation()); } @@ -258,7 +283,7 @@ public class S3AFileSystem extends FileSystem { AWSCredentialsProviderChain credentials, ClientConfiguration awsConf) throws IllegalArgumentException { s3 = new AmazonS3Client(credentials, awsConf); - String endPoint = conf.getTrimmed(ENDPOINT,""); + String endPoint = conf.getTrimmed(ENDPOINT, ""); if (!endPoint.isEmpty()) { try { s3.setEndpoint(endPoint); @@ -301,14 +326,25 @@ public class S3AFileSystem extends FileSystem { private void initMultipartUploads(Configuration conf) { boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, - DEFAULT_PURGE_EXISTING_MULTIPART); - long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, - DEFAULT_PURGE_EXISTING_MULTIPART_AGE); + DEFAULT_PURGE_EXISTING_MULTIPART); + long purgeExistingMultipartAge = longOption(conf, + PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0); if (purgeExistingMultipart) { - Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); + Date purgeBefore = + new Date(new Date().getTime() - purgeExistingMultipartAge * 1000); - transfers.abortMultipartUploads(bucket, purgeBefore); + try { + transfers.abortMultipartUploads(bucket, purgeBefore); + } catch (AmazonServiceException e) { + if (e.getStatusCode() == 403) { + instrumentation.errorIgnored(); + LOG.debug("Failed to abort multipart uploads against {}," + + " FS may be read only", bucket, e); + } else { + throw e; + } + } } } @@ -421,16 +457,15 @@ public class S3AFileSystem extends FileSystem { public FSDataInputStream open(Path f, int bufferSize) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Opening '{}' for reading.", f); - } + LOG.debug("Opening '{}' for reading.", f); final FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + f + " because it is a directory"); + throw new FileNotFoundException("Can't open " + f + + " because it is a directory"); } return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), - fileStatus.getLen(), s3, statistics)); + fileStatus.getLen(), s3, statistics, instrumentation, readAhead)); } /** @@ -456,16 +491,26 @@ public class S3AFileSystem extends FileSystem { if (!overwrite && exists(f)) { throw new FileAlreadyExistsException(f + " already exists"); } + instrumentation.fileCreated(); if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, key, progress, statistics, cannedACL, serverSideEncryptionAlgorithm, partSize, multiPartThreshold, threadPoolExecutor), statistics); } - // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file - return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, - bucket, key, progress, cannedACL, statistics, - serverSideEncryptionAlgorithm), null); + // We pass null to FSDataOutputStream so it won't count writes that + // are being buffered to a file + return new FSDataOutputStream( + new S3AOutputStream(getConf(), + transfers, + this, + bucket, + key, + progress, + cannedACL, + statistics, + serverSideEncryptionAlgorithm), + null); } /** @@ -476,7 +521,7 @@ public class S3AFileSystem extends FileSystem { * @throws IOException indicating that append is not supported. */ public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) throws IOException { + Progressable progress) throws IOException { throw new IOException("Not supported"); } @@ -501,17 +546,13 @@ public class S3AFileSystem extends FileSystem { * @return true if rename is successful */ public boolean rename(Path src, Path dst) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Rename path {} to {}", src, dst); - } + LOG.debug("Rename path {} to {}", src, dst); String srcKey = pathToKey(src); String dstKey = pathToKey(dst); if (srcKey.isEmpty() || dstKey.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src or dst are empty"); - } + LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey); return false; } @@ -524,9 +565,8 @@ public class S3AFileSystem extends FileSystem { } if (srcKey.equals(dstKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src and dst refer to the same file or directory"); - } + LOG.debug("rename: src and dst refer to the same file or directory: {}", + dst); return srcStatus.isFile(); } @@ -535,9 +575,8 @@ public class S3AFileSystem extends FileSystem { dstStatus = getFileStatus(dst); if (srcStatus.isDirectory() && dstStatus.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src is a directory and dst is a file"); - } + LOG.debug("rename: src {} is a directory and dst {} is a file", + src, dst); return false; } @@ -545,6 +584,7 @@ public class S3AFileSystem extends FileSystem { return false; } } catch (FileNotFoundException e) { + LOG.debug("rename: destination path {} not found", dst); // Parent must exist Path parent = dst.getParent(); if (!pathToKey(parent).isEmpty()) { @@ -554,6 +594,8 @@ public class S3AFileSystem extends FileSystem { return false; } } catch (FileNotFoundException e2) { + LOG.debug("rename: destination path {} has no parent {}", + dst, parent); return false; } } @@ -561,9 +603,7 @@ public class S3AFileSystem extends FileSystem { // Ok! Time to start if (srcStatus.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming file " + src + " to " + dst); - } + LOG.debug("rename: renaming file {} to {}", src, dst); if (dstStatus != null && dstStatus.isDirectory()) { String newDstKey = dstKey; if (!newDstKey.endsWith("/")) { @@ -572,15 +612,13 @@ public class S3AFileSystem extends FileSystem { String filename = srcKey.substring(pathToKey(src.getParent()).length()+1); newDstKey = newDstKey + filename; - copyFile(srcKey, newDstKey); + copyFile(srcKey, newDstKey, srcStatus.getLen()); } else { - copyFile(srcKey, dstKey); + copyFile(srcKey, dstKey, srcStatus.getLen()); } delete(src, false); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming directory " + src + " to " + dst); - } + LOG.debug("rename: renaming directory {} to {}", src, dst); // This is a directory to directory copy if (!dstKey.endsWith("/")) { @@ -593,14 +631,12 @@ public class S3AFileSystem extends FileSystem { //Verify dest is not a child of the source directory if (dstKey.startsWith(srcKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot rename a directory to a subdirectory of self"); - } + LOG.debug("cannot rename a directory {}" + + " to a subdirectory of self: {}", srcKey, dstKey); return false; } - List<DeleteObjectsRequest.KeyVersion> keysToDelete = - new ArrayList<>(); + List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(); if (dstStatus != null && dstStatus.isEmptyDirectory()) { // delete unnecessary fake directory. keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); @@ -618,7 +654,7 @@ public class S3AFileSystem extends FileSystem { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey); + copyFile(summary.getKey(), newDstKey, summary.getSize()); if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { removeKeys(keysToDelete, true); @@ -657,6 +693,7 @@ public class S3AFileSystem extends FileSystem { DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); s3.deleteObjects(deleteRequest); + instrumentation.fileDeleted(keysToDelete.size()); statistics.incrementWriteOps(1); } else { int writeops = 0; @@ -666,7 +703,7 @@ public class S3AFileSystem extends FileSystem { new DeleteObjectRequest(bucket, keyVersion.getKey())); writeops++; } - + instrumentation.fileDeleted(keysToDelete.size()); statistics.incrementWriteOps(writeops); } if (clearKeys) { @@ -684,25 +721,20 @@ public class S3AFileSystem extends FileSystem { * @throws IOException due to inability to delete a directory or file. */ public boolean delete(Path f, boolean recursive) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete path " + f + " - recursive " + recursive); - } + LOG.debug("Delete path {} - recursive {}", f , recursive); S3AFileStatus status; try { status = getFileStatus(f); } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Couldn't delete " + f + " - does not exist"); - } + LOG.debug("Couldn't delete {} - does not exist", f); + instrumentation.errorIgnored(); return false; } String key = pathToKey(f); if (status.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a directory"); - } + LOG.debug("delete: Path is a directory: {}", f); if (!recursive && !status.isEmptyDirectory()) { throw new IOException("Path is a folder: " + f + @@ -719,15 +751,12 @@ public class S3AFileSystem extends FileSystem { } if (status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake empty directory"); - } + LOG.debug("Deleting fake empty directory {}", key); s3.deleteObject(bucket, key); + instrumentation.directoryDeleted(); statistics.incrementWriteOps(1); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting objects for directory prefix " + key + " to delete"); - } + LOG.debug("Getting objects for directory prefix {} to delete", key); ListObjectsRequest request = new ListObjectsRequest(); request.setBucketName(bucket); @@ -736,16 +765,13 @@ public class S3AFileSystem extends FileSystem { //request.setDelimiter("/"); request.setMaxKeys(maxKeys); - List<DeleteObjectsRequest.KeyVersion> keys = - new ArrayList<>(); + List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>(); ObjectListing objects = s3.listObjects(request); statistics.incrementReadOps(1); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); - if (LOG.isDebugEnabled()) { - LOG.debug("Got object to delete " + summary.getKey()); - } + LOG.debug("Got object to delete {}", summary.getKey()); if (keys.size() == MAX_ENTRIES_TO_DELETE) { removeKeys(keys, true); @@ -764,10 +790,9 @@ public class S3AFileSystem extends FileSystem { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a file"); - } + LOG.debug("delete: Path is a file"); s3.deleteObject(bucket, key); + instrumentation.fileDeleted(1); statistics.incrementWriteOps(1); } @@ -779,9 +804,7 @@ public class S3AFileSystem extends FileSystem { private void createFakeDirectoryIfNecessary(Path f) throws IOException { String key = pathToKey(f); if (!key.isEmpty() && !exists(f)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new fake directory at " + f); - } + LOG.debug("Creating new fake directory at {}", f); createFakeDirectory(bucket, key); } } @@ -798,9 +821,7 @@ public class S3AFileSystem extends FileSystem { public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("List status for path: " + f); - } + LOG.debug("List status for path: {}", f); final List<FileStatus> result = new ArrayList<FileStatus>(); final FileStatus fileStatus = getFileStatus(f); @@ -816,9 +837,7 @@ public class S3AFileSystem extends FileSystem { request.setDelimiter("/"); request.setMaxKeys(maxKeys); - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: doing listObjects for directory " + key); - } + LOG.debug("listStatus: doing listObjects for directory {}", key); ObjectListing objects = s3.listObjects(request); statistics.incrementReadOps(1); @@ -831,24 +850,18 @@ public class S3AFileSystem extends FileSystem { // Skip over keys that are ourselves and old S3N _$folder$ files if (keyPath.equals(fQualified) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + keyPath); - } + LOG.debug("Ignoring: {}", keyPath); continue; } if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { result.add(new S3AFileStatus(true, true, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fd: " + keyPath); - } + LOG.debug("Adding: fd: {}", keyPath); } else { result.add(new S3AFileStatus(summary.getSize(), dateToLong(summary.getLastModified()), keyPath, getDefaultBlockSize(fQualified))); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fi: " + keyPath); - } + LOG.debug("Adding: fi: {}", keyPath); } } @@ -858,16 +871,11 @@ public class S3AFileSystem extends FileSystem { continue; } result.add(new S3AFileStatus(true, false, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd: " + keyPath); - } + LOG.debug("Adding: rd: {}", keyPath); } if (objects.isTruncated()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: list truncated - getting next batch"); - } - + LOG.debug("listStatus: list truncated - getting next batch"); objects = s3.listNextBatchOfObjects(objects); statistics.incrementReadOps(1); } else { @@ -875,9 +883,7 @@ public class S3AFileSystem extends FileSystem { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd (not a dir): " + f); - } + LOG.debug("Adding: rd (not a dir): {}", f); result.add(fileStatus); } @@ -890,14 +896,14 @@ public class S3AFileSystem extends FileSystem { * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. * - * @param new_dir the current working directory. + * @param newDir the current working directory. */ - public void setWorkingDirectory(Path new_dir) { - workingDir = new_dir; + public void setWorkingDirectory(Path newDir) { + workingDir = newDir; } /** - * Get the current working directory for the given file system + * Get the current working directory for the given file system. * @return the directory pathname */ public Path getWorkingDirectory() { @@ -914,10 +920,7 @@ public class S3AFileSystem extends FileSystem { // TODO: If we have created an empty file at /foo/bar and we then call // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Making directory: " + f); - } - + LOG.debug("Making directory: {}", f); try { FileStatus fileStatus = getFileStatus(f); @@ -938,6 +941,7 @@ public class S3AFileSystem extends FileSystem { fPart)); } } catch (FileNotFoundException fnfe) { + instrumentation.errorIgnored(); } fPart = fPart.getParent(); } while (fPart != null); @@ -957,10 +961,7 @@ public class S3AFileSystem extends FileSystem { */ public S3AFileStatus getFileStatus(Path f) throws IOException { String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting path status for " + f + " (" + key + ")"); - } - + LOG.debug("Getting path status for {} ({})", f , key); if (!key.isEmpty()) { try { @@ -968,15 +969,11 @@ public class S3AFileSystem extends FileSystem { statistics.incrementReadOps(1); if (objectRepresentsDirectory(key, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: fake directory"); - } + LOG.debug("Found exact file: fake directory"); return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: normal file"); - } + LOG.debug("Found exact file: normal file"); return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), f.makeQualified(uri, workingDir), @@ -984,25 +981,23 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(f.toString(), e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(f.toString(), e); throw e; } // Necessary? if (!key.endsWith("/")) { + String newKey = key + "/"; try { - String newKey = key + "/"; ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); statistics.incrementReadOps(1); if (objectRepresentsDirectory(newKey, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found file (with /): fake directory"); - } + LOG.debug("Found file (with /): fake directory"); return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); } else { LOG.warn("Found file (with /): real file? should not happen: {}", key); @@ -1014,11 +1009,11 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(newKey, e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(newKey, e); throw e; } } @@ -1038,17 +1033,17 @@ public class S3AFileSystem extends FileSystem { statistics.incrementReadOps(1); if (!objects.getCommonPrefixes().isEmpty() - || objects.getObjectSummaries().size() > 0) { + || !objects.getObjectSummaries().isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("Found path as directory (with /): " + - objects.getCommonPrefixes().size() + "/" + + LOG.debug("Found path as directory (with /): {}/{}", + objects.getCommonPrefixes().size() , objects.getObjectSummaries().size()); for (S3ObjectSummary summary : objects.getObjectSummaries()) { - LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); + LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize()); } for (String prefix : objects.getCommonPrefixes()) { - LOG.debug("Prefix: " + prefix); + LOG.debug("Prefix: {}", prefix); } } @@ -1060,17 +1055,15 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(key, e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(key, e); throw e; } - if (LOG.isDebugEnabled()) { - LOG.debug("Not Found: " + f); - } + LOG.debug("Not Found: {}", f); throw new FileNotFoundException("No such file or directory: " + f); } @@ -1089,15 +1082,13 @@ public class S3AFileSystem extends FileSystem { */ @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, - Path dst) throws IOException { + Path dst) throws IOException { String key = pathToKey(dst); if (!overwrite && exists(dst)) { - throw new IOException(dst + " already exists"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Copying local file from " + src + " to " + dst); + throw new FileAlreadyExistsException(dst + " already exists"); } + LOG.debug("Copying local file from {} to {}", src, dst); // Since we have a local file, we don't need to stream into a temporary file LocalFileSystem local = getLocal(getConf()); @@ -1123,13 +1114,14 @@ public class S3AFileSystem extends FileSystem { } }; + statistics.incrementWriteOps(1); Upload up = transfers.upload(putObjectRequest); up.addProgressListener(progressListener); try { up.waitForUploadResult(); - statistics.incrementWriteOps(1); } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); + throw new InterruptedIOException("Interrupted copying " + src + + " to " + dst + ", cancelling"); } // This will delete unnecessary fake parent directories @@ -1153,7 +1145,7 @@ public class S3AFileSystem extends FileSystem { } /** - * Override getCononicalServiceName because we don't support token in S3A + * Override getCanonicalServiceName because we don't support token in S3A. */ @Override public String getCanonicalServiceName() { @@ -1161,17 +1153,17 @@ public class S3AFileSystem extends FileSystem { return null; } - private void copyFile(String srcKey, String dstKey) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("copyFile " + srcKey + " -> " + dstKey); - } + private void copyFile(String srcKey, String dstKey, long size) + throws IOException { + LOG.debug("copyFile {} -> {} ", srcKey, dstKey); ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); } - CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + CopyObjectRequest copyObjectRequest = + new CopyObjectRequest(bucket, srcKey, bucket, dstKey); copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setNewObjectMetadata(dstom); @@ -1192,13 +1184,17 @@ public class S3AFileSystem extends FileSystem { try { copy.waitForCopyResult(); statistics.incrementWriteOps(1); + instrumentation.filesCopied(1, size); } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); } } private boolean objectRepresentsDirectory(final String name, final long size) { - return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + return !name.isEmpty() + && name.charAt(name.length() - 1) == '/' + && size == 0L; } // Handles null Dates that can be returned by AWS @@ -1216,8 +1212,9 @@ public class S3AFileSystem extends FileSystem { private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { while (true) { + String key = ""; try { - String key = pathToKey(f); + key = pathToKey(f); if (key.isEmpty()) { break; } @@ -1225,13 +1222,13 @@ public class S3AFileSystem extends FileSystem { S3AFileStatus status = getFileStatus(f); if (status.isDirectory() && status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake directory " + key + "/"); - } + LOG.debug("Deleting fake directory {}/", key); s3.deleteObject(bucket, key + "/"); statistics.incrementWriteOps(1); } } catch (FileNotFoundException | AmazonServiceException e) { + LOG.debug("While deleting key {} ", key, e); + instrumentation.errorIgnored(); } if (f.isRoot()) { @@ -1267,10 +1264,12 @@ public class S3AFileSystem extends FileSystem { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); + PutObjectRequest putObjectRequest = + new PutObjectRequest(bucketName, objectName, im, om); putObjectRequest.setCannedAcl(cannedACL); s3.putObject(putObjectRequest); statistics.incrementWriteOps(1); + instrumentation.directoryCreated(); } /** @@ -1342,31 +1341,115 @@ public class S3AFileSystem extends FileSystem { /** * Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. + * be split into to minimize I/O time. * @deprecated use {@link #getDefaultBlockSize(Path)} instead */ @Deprecated public long getDefaultBlockSize() { - // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); } - private void printAmazonServiceException(AmazonServiceException ase) { - LOG.info("Caught an AmazonServiceException, which means your request made it " + - "to Amazon S3, but was rejected with an error response for some reason."); - LOG.info("Error Message: " + ase.getMessage()); - LOG.info("HTTP Status Code: " + ase.getStatusCode()); - LOG.info("AWS Error Code: " + ase.getErrorCode()); - LOG.info("Error Type: " + ase.getErrorType()); - LOG.info("Request ID: " + ase.getRequestId()); - LOG.info("Class Name: " + ase.getClass().getName()); + private void printAmazonServiceException(String target, + AmazonServiceException ase) { + LOG.info("{}: caught an AmazonServiceException {}", target, ase); + LOG.info("This means your request made it to Amazon S3," + + " but was rejected with an error response for some reason."); + LOG.info("Error Message: {}", ase.getMessage()); + LOG.info("HTTP Status Code: {}", ase.getStatusCode()); + LOG.info("AWS Error Code: {}", ase.getErrorCode()); + LOG.info("Error Type: {}", ase.getErrorType()); + LOG.info("Request ID: {}", ase.getRequestId()); + LOG.info("Class Name: {}", ase.getClass().getName()); + LOG.info("Exception", ase); + } + + private void printAmazonClientException(String target, + AmazonClientException ace) { + LOG.info("{}: caught an AmazonClientException {}", target, ace); + LOG.info("This means the client encountered " + + "a problem while trying to communicate with S3, " + + "such as not being able to access the network.", ace); } - private void printAmazonClientException(AmazonClientException ace) { - LOG.info("Caught an AmazonClientException, which means the client encountered " + - "a serious internal problem while trying to communicate with S3, " + - "such as not being able to access the network."); - LOG.info("Error Message: {}" + ace, ace); + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "S3AFileSystem{"); + sb.append("uri=").append(uri); + sb.append(", workingDir=").append(workingDir); + sb.append(", partSize=").append(partSize); + sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete); + sb.append(", maxKeys=").append(maxKeys); + sb.append(", cannedACL=").append(cannedACL.toString()); + sb.append(", readAhead=").append(readAhead); + sb.append(", blockSize=").append(getDefaultBlockSize()); + sb.append(", multiPartThreshold=").append(multiPartThreshold); + if (serverSideEncryptionAlgorithm != null) { + sb.append(", serverSideEncryptionAlgorithm='") + .append(serverSideEncryptionAlgorithm) + .append('\''); + } + sb.append(", statistics {") + .append(statistics.toString()) + .append("}"); + sb.append(", metrics {") + .append(instrumentation.dump("{", "=", "} ", true)) + .append("}"); + sb.append('}'); + return sb.toString(); + } + + /** + * Get the partition size for multipart operations. + * @return the value as set during initialization + */ + public long getPartitionSize() { + return partSize; + } + + /** + * Get the threshold for multipart files + * @return the value as set during initialization + */ + public long getMultiPartThreshold() { + return multiPartThreshold; + } + + /** + * Get a integer option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static int intOption(Configuration conf, String key, int defVal, int min) { + int v = conf.getInt(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; + } + + /** + * Get a long option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static long longOption(Configuration conf, + String key, + long defVal, + long min) { + long v = conf.getLong(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org