HADOOP-11730. Regression: s3n read failure recovery broken.  (Takenori Sato via 
stevel)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19262d99
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19262d99
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19262d99

Branch: refs/heads/trunk
Commit: 19262d99ebbbd143a7ac9740d3a8e7c842b37591
Parents: 416b843
Author: Steve Loughran <ste...@apache.org>
Authored: Thu Apr 23 21:39:30 2015 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Thu Apr 23 21:41:51 2015 +0100

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../hadoop/fs/s3native/NativeS3FileSystem.java  | 32 +++++++++++---------
 .../NativeS3FileSystemContractBaseTest.java     | 24 +++++++++++----
 3 files changed, 39 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index f232e04..777828e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -568,6 +568,9 @@ Release 2.7.1 - UNRELEASED
     HADOOP-11868. Invalid user logins trigger large backtraces in server log
     (Chang Li via jlowe)
 
+    HADOOP-11730. Regression: s3n read failure recovery broken.
+    (Takenori Sato via stevel)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
index a2f9805..0ad8e5f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3.S3Exception;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -124,7 +125,7 @@ public class NativeS3FileSystem extends FileSystem {
             key);
         LOG.debug("{}", e, e);
         try {
-          seek(pos);
+          reopen(pos);
           result = in.read();
         } catch (EOFException eof) {
           LOG.debug("EOF on input stream read: {}", eof, eof);
@@ -153,7 +154,7 @@ public class NativeS3FileSystem extends FileSystem {
       } catch (IOException e) {
         LOG.info( "Received IOException while reading '{}'," +
                   " attempting to reopen.", key);
-        seek(pos);
+        reopen(pos);
         result = in.read(b, off, len);
       }
       if (result > 0) {
@@ -173,16 +174,21 @@ public class NativeS3FileSystem extends FileSystem {
     /**
      * Close the inner stream if not null. Even if an exception
      * is raised during the close, the field is set to null
-     * @throws IOException if raised by the close() operation.
      */
-    private void closeInnerStream() throws IOException {
-      if (in != null) {
-        try {
-          in.close();
-        } finally {
-          in = null;
-        }
-      }
+    private void closeInnerStream() {
+      IOUtils.closeStream(in);
+      in = null;
+    }
+
+    /**
+     * Reopen a new input stream with the specified position
+     * @param pos the position to reopen a new stream
+     * @throws IOException
+     */
+    private synchronized void reopen(long pos) throws IOException {
+        LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
+        InputStream newStream = store.retrieve(key, pos);
+        updateInnerStream(newStream, pos);
     }
 
     /**
@@ -207,9 +213,7 @@ public class NativeS3FileSystem extends FileSystem {
       }
       if (pos != newpos) {
         // the seek is attempting to move the current position
-        LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
-        InputStream newStream = store.retrieve(key, newpos);
-        updateInnerStream(newStream, newpos);
+        reopen(newpos);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19262d99/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
index 79ef9da..ef223ac 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
@@ -165,14 +165,15 @@ public abstract class NativeS3FileSystemContractBaseTest
   
   public void testRetryOnIoException() throws Exception {
     class TestInputStream extends InputStream {
-      boolean shouldThrow = false;
+      boolean shouldThrow = true;
       int throwCount = 0;
       int pos = 0;
       byte[] bytes;
+      boolean threwException = false;
       
       public TestInputStream() {
         bytes = new byte[256];
-        for (int i = 0; i < 256; i++) {
+        for (int i = pos; i < 256; i++) {
           bytes[i] = (byte)i;
         }
       }
@@ -182,8 +183,10 @@ public abstract class NativeS3FileSystemContractBaseTest
         shouldThrow = !shouldThrow;
         if (shouldThrow) {
           throwCount++;
+          threwException = true;
           throw new IOException();
         }
+        assertFalse("IOException was thrown. InputStream should be reopened", 
threwException);
         return pos++;
       }
       
@@ -192,9 +195,10 @@ public abstract class NativeS3FileSystemContractBaseTest
         shouldThrow = !shouldThrow;
         if (shouldThrow) {
           throwCount++;
+          threwException = true;
           throw new IOException();
         }
-        
+        assertFalse("IOException was thrown. InputStream should be reopened", 
threwException);
         int sizeToRead = Math.min(len, 256 - pos);
         for (int i = 0; i < sizeToRead; i++) {
           b[i] = bytes[pos + i];
@@ -202,13 +206,20 @@ public abstract class NativeS3FileSystemContractBaseTest
         pos += sizeToRead;
         return sizeToRead;
       }
+
+      public void reopenAt(long byteRangeStart) {
+        threwException = false;
+        pos = Long.valueOf(byteRangeStart).intValue();
+      }
+
     }
     
-    final InputStream is = new TestInputStream();
+    final TestInputStream is = new TestInputStream();
     
     class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
       @Override
       public InputStream retrieve(String key, long byteRangeStart) throws 
IOException {
+        is.reopenAt(byteRangeStart);
         return is;
       }
     }
@@ -233,8 +244,9 @@ public abstract class NativeS3FileSystemContractBaseTest
     }
     
     // Test to make sure the throw path was exercised.
-    // 144 = 128 + (128 / 8)
-    assertEquals(144, ((TestInputStream)is).throwCount);
+    // every read should have thrown 1 IOException except for the first read
+    // 144 = 128 - 1 + (128 / 8)
+    assertEquals(143, ((TestInputStream)is).throwCount);
   }
 
 }

Reply via email to