HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API. 
Contributed by Dushyanth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ce0a650
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ce0a650
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ce0a650

Branch: refs/heads/HDFS-7240
Commit: 3ce0a6502e78240f551c29bb27a2324ce359cd70
Parents: 259bea3
Author: cnauroth <cnaur...@apache.org>
Authored: Mon Nov 2 09:38:37 2015 -0800
Committer: cnauroth <cnaur...@apache.org>
Committed: Mon Nov 2 10:17:41 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 125 ++++++++++---
 ...estFileSystemOperationExceptionHandling.java | 131 +++++++++++++
 ...perationsExceptionHandlingMultiThreaded.java | 185 +++++++++++++++++++
 4 files changed, 422 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 5c8daad..c8d60b0 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1304,6 +1304,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12519. hadoop-azure tests should avoid creating a metrics
     configuration file in the module root directory. (cnauroth)
 
+    HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek 
API.
+    (Dushyanth via cnauroth)
+
   OPTIMIZATIONS
 
     HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 7c5a504..73bc6b3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azure;
 
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +64,6 @@ import 
org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.JsonNode;
@@ -74,9 +75,11 @@ import org.codehaus.jackson.map.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageErrorCode;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.core.*;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * A {@link FileSystem} for reading and writing files stored on <a
@@ -88,7 +91,6 @@ import com.microsoft.azure.storage.core.*;
 @InterfaceStability.Stable
 public class NativeAzureFileSystem extends FileSystem {
   private static final int USER_WX_PERMISION = 0300;
-
   /**
    * A description of a folder rename operation, including the source and
    * destination keys, and descriptions of the files in the source folder.
@@ -712,7 +714,7 @@ public class NativeAzureFileSystem extends FileSystem {
      * @returns int An integer corresponding to the byte read.
      */
     @Override
-    public synchronized int read() throws IOException {
+    public synchronized int read() throws FileNotFoundException, IOException {
       try {
         int result = 0;
         result = in.read();
@@ -726,13 +728,21 @@ public class NativeAzureFileSystem extends FileSystem {
       //
         return result;
       } catch(IOException e) {
-        if (e.getCause() instanceof StorageException) {
-          StorageException storageExcp  = (StorageException) e.getCause();
+
+        Throwable innerException = checkForAzureStorageException(e);
+
+        if (innerException instanceof StorageException) {
+
           LOG.error("Encountered Storage Exception for read on Blob : {}"
               + " Exception details: {} Error Code : {}",
-              key, e.getMessage(), storageExcp.getErrorCode());
+              key, e, ((StorageException) innerException).getErrorCode());
+
+          if (isFileNotFoundException((StorageException) innerException)) {
+            throw new FileNotFoundException(String.format("%s is not found", 
key));
+          }
         }
-        throw e;
+
+       throw e;
       }
     }
 
@@ -757,7 +767,7 @@ public class NativeAzureFileSystem extends FileSystem {
      * there is no more data because the end of stream is reached.
      */
     @Override
-    public synchronized int read(byte[] b, int off, int len) throws 
IOException {
+    public synchronized int read(byte[] b, int off, int len) throws 
FileNotFoundException, IOException {
       try {
         int result = 0;
         result = in.read(b, off, len);
@@ -772,29 +782,56 @@ public class NativeAzureFileSystem extends FileSystem {
         // Return to the caller with the result.
         return result;
       } catch(IOException e) {
-        if (e.getCause() instanceof StorageException) {
-          StorageException storageExcp  = (StorageException) e.getCause();
+
+        Throwable innerException = checkForAzureStorageException(e);
+
+        if (innerException instanceof StorageException) {
+
           LOG.error("Encountered Storage Exception for read on Blob : {}"
               + " Exception details: {} Error Code : {}",
-              key, e.getMessage(), storageExcp.getErrorCode());
+              key, e, ((StorageException) innerException).getErrorCode());
+
+          if (isFileNotFoundException((StorageException) innerException)) {
+            throw new FileNotFoundException(String.format("%s is not found", 
key));
+          }
         }
-        throw e;
+
+       throw e;
       }
     }
 
     @Override
-    public void close() throws IOException {
-      in.close();
-      closed = true;
+    public synchronized void close() throws IOException {
+      if (!closed) {
+        closed = true;
+        IOUtils.closeStream(in);
+        in = null;
+      }
     }
 
     @Override
-    public synchronized void seek(long pos) throws IOException {
-     in.close();
-     in = store.retrieve(key);
-     this.pos = in.skip(pos);
-     LOG.debug("Seek to position {}. Bytes skipped {}", pos,
-         this.pos);
+    public synchronized void seek(long pos) throws FileNotFoundException, 
EOFException, IOException {
+      try {
+        checkNotClosed();
+        if (pos < 0) {
+          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        IOUtils.closeStream(in);
+        in = store.retrieve(key);
+        this.pos = in.skip(pos);
+        LOG.debug("Seek to position {}. Bytes skipped {}", pos,
+          this.pos);
+      } catch(IOException e) {
+
+        Throwable innerException = checkForAzureStorageException(e);
+
+        if (innerException instanceof StorageException
+             && isFileNotFoundException((StorageException) innerException)) {
+          throw new FileNotFoundException(String.format("%s is not found", 
key));
+        }
+
+        throw e;
+      }
     }
 
     @Override
@@ -806,6 +843,50 @@ public class NativeAzureFileSystem extends FileSystem {
     public boolean seekToNewSource(long targetPos) throws IOException {
       return false;
     }
+
+    /*
+     * Helper method to recursively check if the cause of the exception is
+     * a Azure storage exception.
+     */
+    private Throwable checkForAzureStorageException(IOException e) {
+
+      Throwable innerException = e.getCause();
+
+      while (innerException != null
+              && !(innerException instanceof StorageException)) {
+        innerException = innerException.getCause();
+      }
+
+      return innerException;
+    }
+
+    /*
+     * Helper method to check if the AzureStorageException is
+     * because backing blob was not found.
+     */
+    private boolean isFileNotFoundException(StorageException e) {
+
+      String errorCode = ((StorageException) e).getErrorCode();
+      if (errorCode != null
+          && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+              || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+              || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+              || 
errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+        return true;
+      }
+
+      return false;
+    }
+
+    /*
+     * Helper method to check if a stream is closed.
+     */
+    private void checkNotClosed() throws IOException {
+      if (closed) {
+        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      }
+    }
   }
 
   private class NativeAzureFsOutputStream extends OutputStream {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
new file mode 100644
index 0000000..35a1f50
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+
+public class TestFileSystemOperationExceptionHandling extends
+  NativeAzureFileSystemBaseTest {
+
+  FSDataInputStream inputStream = null;
+  /*
+   * Helper method to create a PageBlob test storage account.
+   */
+  private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page 
blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+
+  /*
+   * Helper method that creates a InputStream to validate exceptions
+   * for various scenarios
+   */
+  private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
+      throws Exception {
+
+    fs = testAccount.getFileSystem();
+
+    // Step 1: Create a file and write dummy data.
+    Path testFilePath1 = new Path("test1.dat");
+    Path testFilePath2 = new Path("test2.dat");
+    FSDataOutputStream outputStream = fs.create(testFilePath1);
+    String testString = "This is a test string";
+    outputStream.write(testString.getBytes());
+    outputStream.close();
+
+    // Step 2: Open a read stream on the file.
+    inputStream = fs.open(testFilePath1);
+
+    // Step 3: Rename the file
+    fs.rename(testFilePath1, testFilePath2);
+  }
+
+  /*
+   * Tests a basic single threaded read scenario for Page blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadedPageBlobReadScenario() throws Throwable {
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    setupInputStreamToTest(testAccount);
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /*
+   * Tests a basic single threaded seek scenario for Page blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    setupInputStreamToTest(testAccount);
+    inputStream.seek(5);
+  }
+
+  /*
+   * Test a basic single thread seek scenario for Block blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadBlockBlobSeekScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    setupInputStreamToTest(testAccount);
+    inputStream.seek(5);
+  }
+
+  /*
+   * Tests a basic single threaded read scenario for Block blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingledThreadBlockBlobReadScenario() throws Throwable{
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    setupInputStreamToTest(testAccount);
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (inputStream != null) {
+      inputStream.close();
+    }
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
new file mode 100644
index 0000000..0f91500
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
+  NativeAzureFileSystemBaseTest {
+
+  FSDataInputStream inputStream = null;
+  /*
+   * Helper method to creates an input stream to test various scenarios.
+   */
+  private void getInputStreamToTest(FileSystem fs, Path testPath) throws 
Throwable {
+
+    FSDataOutputStream outputStream = fs.create(testPath);
+    String testString = "This is a test string";
+    outputStream.write(testString.getBytes());
+    outputStream.close();
+
+    inputStream = fs.open(testPath);
+  }
+
+  /*
+   * Test to validate correct exception is thrown for Multithreaded read
+   * scenario for block blobs
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobReadScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    fs = testAccount.getFileSystem();
+    Path testFilePath1 = new Path("test1.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+    renameThread.start();
+
+    renameThread.join();
+
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /*
+   * Test to validate correct exception is thrown for Multithreaded seek
+   * scenario for block blobs
+   */
+
+  @Test(expected=FileNotFoundException.class)
+  public void testMultiThreadBlockBlobSeekScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    fs = testAccount.getFileSystem();
+    Path testFilePath1 = new Path("test1.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+    renameThread.start();
+
+    renameThread.join();
+
+    inputStream.seek(5);
+  }
+
+  /*
+   * Test to validate correct exception is thrown for Multithreaded read
+   * scenario for page blobs
+   */
+
+  @Test(expected=FileNotFoundException.class)
+  public void testMultiThreadedPageBlobReadScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    fs = testAccount.getFileSystem();
+    Path testFilePath1 = new Path("test1.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+    renameThread.start();
+
+    renameThread.join();
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /*
+   * Test to validate correct exception is thrown for Multithreaded seek
+   * scenario for page blobs
+   */
+
+  @Test(expected=FileNotFoundException.class)
+  public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    fs = testAccount.getFileSystem();
+    Path testFilePath1 = new Path("test1.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+    renameThread.start();
+
+    renameThread.join();
+    inputStream.seek(5);
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  /*
+   * Helper method to create a PageBlob test storage account.
+   */
+  private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page 
blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (inputStream != null) {
+      inputStream.close();
+    }
+  }
+}
+
+/*
+ * Helper thread that just renames the test file.
+ */
+class RenameThread implements Runnable {
+
+  private FileSystem fs;
+  private Path testPath;
+  private Path renamePath = new Path("test2.dat");
+
+  public RenameThread(FileSystem fs, Path testPath) {
+    this.fs = fs;
+    this.testPath = testPath;
+  }
+
+  @Override
+  public void run(){
+    try {
+      fs.rename(testPath, renamePath);
+    }catch (Exception e) {
+      // Swallowing the exception as the
+      // correctness of the test is controlled
+      // by the other thread
+    }
+  }
+}

Reply via email to