HADOOP-11730. Regression: s3n read failure recovery broken. (Takenori Sato via stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19262d99 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19262d99 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19262d99 Branch: refs/heads/trunk Commit: 19262d99ebbbd143a7ac9740d3a8e7c842b37591 Parents: 416b843 Author: Steve Loughran <ste...@apache.org> Authored: Thu Apr 23 21:39:30 2015 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Thu Apr 23 21:41:51 2015 +0100 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++ .../hadoop/fs/s3native/NativeS3FileSystem.java | 32 +++++++++++--------- .../NativeS3FileSystemContractBaseTest.java | 24 +++++++++++---- 3 files changed, 39 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f232e04..777828e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -568,6 +568,9 @@ Release 2.7.1 - UNRELEASED HADOOP-11868. Invalid user logins trigger large backtraces in server log (Chang Li via jlowe) + HADOOP-11730. Regression: s3n read failure recovery broken. + (Takenori Sato via stevel) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java index a2f9805..0ad8e5f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -124,7 +125,7 @@ public class NativeS3FileSystem extends FileSystem { key); LOG.debug("{}", e, e); try { - seek(pos); + reopen(pos); result = in.read(); } catch (EOFException eof) { LOG.debug("EOF on input stream read: {}", eof, eof); @@ -153,7 +154,7 @@ public class NativeS3FileSystem extends FileSystem { } catch (IOException e) { LOG.info( "Received IOException while reading '{}'," + " attempting to reopen.", key); - seek(pos); + reopen(pos); result = in.read(b, off, len); } if (result > 0) { @@ -173,16 +174,21 @@ public class NativeS3FileSystem extends FileSystem { /** * Close the inner stream if not null. Even if an exception * is raised during the close, the field is set to null - * @throws IOException if raised by the close() operation. */ - private void closeInnerStream() throws IOException { - if (in != null) { - try { - in.close(); - } finally { - in = null; - } - } + private void closeInnerStream() { + IOUtils.closeStream(in); + in = null; + } + + /** + * Reopen a new input stream with the specified position + * @param pos the position to reopen a new stream + * @throws IOException + */ + private synchronized void reopen(long pos) throws IOException { + LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); + InputStream newStream = store.retrieve(key, pos); + updateInnerStream(newStream, pos); } /** @@ -207,9 +213,7 @@ public class NativeS3FileSystem extends FileSystem { } if (pos != newpos) { // the seek is attempting to move the current position - LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); - InputStream newStream = store.retrieve(key, newpos); - updateInnerStream(newStream, newpos); + reopen(newpos); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java index 79ef9da..ef223ac 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java @@ -165,14 +165,15 @@ public abstract class NativeS3FileSystemContractBaseTest public void testRetryOnIoException() throws Exception { class TestInputStream extends InputStream { - boolean shouldThrow = false; + boolean shouldThrow = true; int throwCount = 0; int pos = 0; byte[] bytes; + boolean threwException = false; public TestInputStream() { bytes = new byte[256]; - for (int i = 0; i < 256; i++) { + for (int i = pos; i < 256; i++) { bytes[i] = (byte)i; } } @@ -182,8 +183,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } + assertFalse("IOException was thrown. InputStream should be reopened", threwException); return pos++; } @@ -192,9 +195,10 @@ public abstract class NativeS3FileSystemContractBaseTest shouldThrow = !shouldThrow; if (shouldThrow) { throwCount++; + threwException = true; throw new IOException(); } - + assertFalse("IOException was thrown. InputStream should be reopened", threwException); int sizeToRead = Math.min(len, 256 - pos); for (int i = 0; i < sizeToRead; i++) { b[i] = bytes[pos + i]; @@ -202,13 +206,20 @@ public abstract class NativeS3FileSystemContractBaseTest pos += sizeToRead; return sizeToRead; } + + public void reopenAt(long byteRangeStart) { + threwException = false; + pos = Long.valueOf(byteRangeStart).intValue(); + } + } - final InputStream is = new TestInputStream(); + final TestInputStream is = new TestInputStream(); class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore { @Override public InputStream retrieve(String key, long byteRangeStart) throws IOException { + is.reopenAt(byteRangeStart); return is; } } @@ -233,8 +244,9 @@ public abstract class NativeS3FileSystemContractBaseTest } // Test to make sure the throw path was exercised. - // 144 = 128 + (128 / 8) - assertEquals(144, ((TestInputStream)is).throwCount); + // every read should have thrown 1 IOException except for the first read + // 144 = 128 - 1 + (128 / 8) + assertEquals(143, ((TestInputStream)is).throwCount); } }