HADOOP-14596. AWS SDK 1.11+ aborts() on close() if > 0 bytes in stream; logs error. Contributed by Steve Loughran
Change-Id: I49173bf6163796903d64594a8ca8a4bd26ad2bfc Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72993b33 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72993b33 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72993b33 Branch: refs/heads/YARN-3926 Commit: 72993b33b704991f2a0bf743f31b164e58a2dabc Parents: ec97519 Author: Mingliang Liu <lium...@apache.org> Authored: Thu Jun 29 17:00:25 2017 -0700 Committer: Mingliang Liu <lium...@apache.org> Committed: Thu Jun 29 17:07:52 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 26 +++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/72993b33/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 7d322a5..b88b7c1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.IOException; @@ -78,7 +79,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private final String key; private final long contentLength; private final String uri; - public static final Logger LOG = S3AFileSystem.LOG; + private static final Logger LOG = + LoggerFactory.getLogger(S3AInputStream.class); private final S3AInstrumentation.InputStreamStatistics streamStatistics; private S3AEncryptionMethods serverSideEncryptionAlgorithm; private String serverSideEncryptionKey; @@ -451,13 +453,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { // if the amount of data remaining in the current request is greater // than the readahead value: abort. long remaining = remainingInCurrentRequest(); + LOG.debug("Closing stream {}: {}", reason, + forceAbort ? "abort" : "soft"); boolean shouldAbort = forceAbort || remaining > readahead; if (!shouldAbort) { try { // clean close. This will read to the end of the stream, // so, while cleaner, can be pathological on a multi-GB object + + // explicitly drain the stream + long drained = 0; + while (wrappedStream.read() >= 0) { + drained++; + } + LOG.debug("Drained stream of {} bytes", drained); + + // now close it wrappedStream.close(); - streamStatistics.streamClose(false, remaining); + // this MUST come after the close, so that if the IO operations fail + // and an abort is triggered, the initial attempt's statistics + // aren't collected. + streamStatistics.streamClose(false, drained); } catch (IOException e) { // exception escalates to an abort LOG.debug("When closing {} stream for {}", uri, reason, e); @@ -467,13 +483,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { if (shouldAbort) { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. + LOG.debug("Aborting stream"); wrappedStream.abort(); streamStatistics.streamClose(true, remaining); } - LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," + + LOG.debug("Stream {} {}: {}; remaining={} streamPos={}," + + " nextReadPos={}," + " request range {}-{} length={}", uri, (shouldAbort ? "aborted" : "closed"), reason, - pos, nextReadPos, + remaining, pos, nextReadPos, contentRangeStart, contentRangeFinish, length); wrappedStream = null; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org