http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
new file mode 100644
index 0000000..9dcaddc
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -0,0 +1,2222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.mortbay.util.ajax.JSON;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.windowsazure.storage.CloudStorageAccount;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RetryExponentialRetry;
+import com.microsoft.windowsazure.storage.RetryNoRetry;
+import com.microsoft.windowsazure.storage.StorageCredentials;
+import com.microsoft.windowsazure.storage.StorageCredentialsAccountAndKey;
+import 
com.microsoft.windowsazure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.windowsazure.storage.StorageErrorCode;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
+import com.microsoft.windowsazure.storage.blob.BlobProperties;
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CopyStatus;
+import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.core.Utility;
+
[email protected]
+class AzureNativeFileSystemStore implements NativeFileSystemStore {
+  
+  /**
+   * Configuration knob on whether we do block-level MD5 validation on
+   * upload/download.
+   */
+  static final String KEY_CHECK_BLOCK_MD5 = "fs.azure.check.block.md5";
+  /**
+   * Configuration knob on whether we store blob-level MD5 on upload.
+   */
+  static final String KEY_STORE_BLOB_MD5 = "fs.azure.store.blob.md5";
+  static final String DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME = 
"storageemulator";
+  static final String STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME = 
"fs.azure.storage.emulator.account.name";
+
+  public static final Log LOG = LogFactory
+      .getLog(AzureNativeFileSystemStore.class);
+
+  private StorageInterface storageInteractionLayer;
+  private CloudBlobDirectoryWrapper rootDirectory;
+  private CloudBlobContainerWrapper container;
+
+  // Constants local to this class.
+  //
+  private static final String KEY_ACCOUNT_KEYPROVIDER_PREFIX = 
"fs.azure.account.keyprovider.";
+  private static final String KEY_ACCOUNT_SAS_PREFIX = "fs.azure.sas.";
+
+  // note: this value is not present in core-default.xml as our real default is
+  // computed as min(2*cpu,8)
+  private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = 
"fs.azure.concurrentRequestCount.out";
+
+  private static final String KEY_STREAM_MIN_READ_SIZE = 
"fs.azure.read.request.size";
+  private static final String KEY_STORAGE_CONNECTION_TIMEOUT = 
"fs.azure.storage.timeout";
+  private static final String KEY_WRITE_BLOCK_SIZE = 
"fs.azure.write.request.size";
+
+  // Property controlling whether to allow reads on blob which are concurrently
+  // appended out-of-band.
+  private static final String KEY_READ_TOLERATE_CONCURRENT_APPEND = 
"fs.azure.io.read.tolerate.concurrent.append";
+
+  // Configurable throttling parameter properties. These properties are located
+  // in the core-site.xml configuration file.
+  private static final String KEY_MIN_BACKOFF_INTERVAL = 
"fs.azure.io.retry.min.backoff.interval";
+  private static final String KEY_MAX_BACKOFF_INTERVAL = 
"fs.azure.io.retry.max.backoff.interval";
+  private static final String KEY_BACKOFF_INTERVAL = 
"fs.azure.io.retry.backoff.interval";
+  private static final String KEY_MAX_IO_RETRIES = 
"fs.azure.io.retry.max.retries";
+
+  private static final String KEY_SELF_THROTTLE_ENABLE = 
"fs.azure.selfthrottling.enable";
+  private static final String KEY_SELF_THROTTLE_READ_FACTOR = 
"fs.azure.selfthrottling.read.factor";
+  private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = 
"fs.azure.selfthrottling.write.factor";
+
+  private static final String PERMISSION_METADATA_KEY = "hdi_permission";
+  private static final String OLD_PERMISSION_METADATA_KEY = "asv_permission";
+  private static final String IS_FOLDER_METADATA_KEY = "hdi_isfolder";
+  private static final String OLD_IS_FOLDER_METADATA_KEY = "asv_isfolder";
+  static final String VERSION_METADATA_KEY = "hdi_version";
+  static final String OLD_VERSION_METADATA_KEY = "asv_version";
+  static final String FIRST_WASB_VERSION = "2013-01-01";
+  static final String CURRENT_WASB_VERSION = "2013-09-01";
+  static final String LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = 
"hdi_tmpupload";
+  static final String OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = 
"asv_tmpupload";
+
+  private static final String HTTP_SCHEME = "http";
+  private static final String HTTPS_SCHEME = "https";
+  private static final String WASB_AUTHORITY_DELIMITER = "@";
+  private static final String AZURE_ROOT_CONTAINER = "$root";
+
+  private static final int DEFAULT_CONCURRENT_WRITES = 8;
+
+  // Concurrent reads reads of data written out of band are disable by default.
+  private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
+
+  // Default block sizes
+  public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
+  public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
+
+  // Retry parameter defaults.
+  private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
+  private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
+  private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
+  private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
+
+  // Self-throttling defaults. Allowed range = (0,1.0]
+  // Value of 1.0 means no self-throttling.
+  // Value of x means process data at factor x of unrestricted rate
+  private static final boolean DEFAULT_SELF_THROTTLE_ENABLE = true;
+  private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f;
+  private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f;
+
+  private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
+
+  /**
+   * MEMBER VARIABLES
+   */
+
+  private URI sessionUri;
+  private Configuration sessionConfiguration;
+  private int concurrentWrites = DEFAULT_CONCURRENT_WRITES;
+  private boolean isAnonymousCredentials = false;
+  // Set to true if we are connecting using shared access signatures.
+  private boolean connectingUsingSAS = false;
+  private static final JSON PERMISSION_JSON_SERIALIZER = 
createPermissionJsonSerializer();
+
+  private boolean suppressRetryPolicy = false;
+  private boolean canCreateOrModifyContainer = false;
+  private ContainerState currentKnownContainerState = ContainerState.Unknown;
+  private final Object containerStateLock = new Object();
+
+  private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
+
+  private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
+  private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
+
+  // Bandwidth throttling exponential back-off parameters
+  //
+  private int minBackoff; // the minimum back-off interval (ms) between 
retries.
+  private int maxBackoff; // the maximum back-off interval (ms) between 
retries.
+  private int deltaBackoff; // the back-off interval (ms) between retries.
+  private int maxRetries; // the maximum number of retry attempts.
+
+  // Self-throttling parameters
+  private boolean selfThrottlingEnabled;
+  private float selfThrottlingReadFactor;
+  private float selfThrottlingWriteFactor;
+
+  private TestHookOperationContext testHookOperationContext = null;
+
+  // Set if we're running against a storage emulator..
+  private boolean isStorageEmulator = false;
+
+  /**
+   * A test hook interface that can modify the operation context we use for
+   * Azure Storage operations, e.g. to inject errors.
+   */
+  @VisibleForTesting 
+  interface TestHookOperationContext {
+    OperationContext modifyOperationContext(OperationContext original);
+  }
+
+  /**
+   * Suppress the default retry policy for the Storage, useful in unit tests to
+   * test negative cases without waiting forever.
+   */
+  @VisibleForTesting
+  void suppressRetryPolicy() {
+    suppressRetryPolicy = true;
+  }
+
+  /**
+   * Add a test hook to modify the operation context we use for Azure Storage
+   * operations.
+   * 
+   * @param testHook
+   *          The test hook, or null to unset previous hooks.
+   */
+  @VisibleForTesting 
+  void addTestHookToOperationContext(TestHookOperationContext testHook) {
+    this.testHookOperationContext = testHook;
+  }
+
+  /**
+   * If we're asked by unit tests to not retry, set the retry policy factory in
+   * the client accordingly.
+   */
+  private void suppressRetryPolicyInClientIfNeeded() {
+    if (suppressRetryPolicy) {
+      storageInteractionLayer.setRetryPolicyFactory(new RetryNoRetry());
+    }
+  }
+
+  /**
+   * Creates a JSON serializer that can serialize a PermissionStatus object 
into
+   * the JSON string we want in the blob metadata.
+   * 
+   * @return The JSON serializer.
+   */
+  private static JSON createPermissionJsonSerializer() {
+    JSON serializer = new JSON();
+    serializer.addConvertor(PermissionStatus.class,
+        new PermissionStatusJsonSerializer());
+    return serializer;
+  }
+
+  /**
+   * A converter for PermissionStatus to/from JSON as we want it in the blob
+   * metadata.
+   */
+  private static class PermissionStatusJsonSerializer implements 
JSON.Convertor {
+    private static final String OWNER_TAG = "owner";
+    private static final String GROUP_TAG = "group";
+    private static final String PERMISSIONS_TAG = "permissions";
+
+    @Override
+    public void toJSON(Object obj, JSON.Output out) {
+      PermissionStatus permissionStatus = (PermissionStatus) obj;
+      // Don't store group as null, just store it as empty string
+      // (which is FileStatus behavior).
+      String group = permissionStatus.getGroupName() == null ? ""
+          : permissionStatus.getGroupName();
+      out.add(OWNER_TAG, permissionStatus.getUserName());
+      out.add(GROUP_TAG, group);
+      out.add(PERMISSIONS_TAG, permissionStatus.getPermission().toString());
+    }
+
+    @Override
+    public Object fromJSON(@SuppressWarnings("rawtypes") Map object) {
+      return PermissionStatusJsonSerializer.fromJSONMap(object);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static PermissionStatus fromJSONString(String jsonString) {
+      // The JSON class can only find out about an object's class (and call me)
+      // if we store the class name in the JSON string. Since I don't want to
+      // do that (it's an implementation detail), I just deserialize as a
+      // the default Map (JSON's default behavior) and parse that.
+      return fromJSONMap((Map) 
PERMISSION_JSON_SERIALIZER.fromJSON(jsonString));
+    }
+
+    private static PermissionStatus fromJSONMap(
+        @SuppressWarnings("rawtypes") Map object) {
+      return new PermissionStatus((String) object.get(OWNER_TAG),
+          (String) object.get(GROUP_TAG),
+          // The initial - below is the Unix file type,
+          // which FsPermission needs there but ignores.
+          FsPermission.valueOf("-" + (String) object.get(PERMISSIONS_TAG)));
+    }
+  }
+
+  @VisibleForTesting
+  void setAzureStorageInteractionLayer(StorageInterface 
storageInteractionLayer) {
+    this.storageInteractionLayer = storageInteractionLayer;
+  }
+
+  /**
+   * Check if concurrent reads and writes on the same blob are allowed.
+   * 
+   * @return true if concurrent reads and OOB writes has been configured, false
+   *         otherwise.
+   */
+  private boolean isConcurrentOOBAppendAllowed() {
+    return tolerateOobAppends;
+  }
+
+  /**
+   * Method for the URI and configuration object necessary to create a storage
+   * session with an Azure session. It parses the scheme to ensure it matches
+   * the storage protocol supported by this file system.
+   * 
+   * @param uri
+   *          - URI for target storage blob.
+   * @param conf
+   *          - reference to configuration object.
+   * 
+   * @throws IllegalArgumentException
+   *           if URI or job object is null, or invalid scheme.
+   */
+  @Override
+  public void initialize(URI uri, Configuration conf) throws AzureException {
+
+    if (null == this.storageInteractionLayer) {
+      this.storageInteractionLayer = new StorageInterfaceImpl();
+    }
+
+    // Check that URI exists.
+    //
+    if (null == uri) {
+      throw new IllegalArgumentException(
+          "Cannot initialize WASB file system, URI is null");
+    }
+
+    // Check that configuration object is non-null.
+    //
+    if (null == conf) {
+      throw new IllegalArgumentException(
+          "Cannot initialize WASB file system, URI is null");
+    }
+
+    // Incoming parameters validated. Capture the URI and the job configuration
+    // object.
+    //
+    sessionUri = uri;
+    sessionConfiguration = conf;
+
+    // Start an Azure storage session.
+    //
+    createAzureStorageSession();
+  }
+
+  /**
+   * Method to extract the account name from an Azure URI.
+   * 
+   * @param uri
+   *          -- WASB blob URI
+   * @returns accountName -- the account name for the URI.
+   * @throws URISyntaxException
+   *           if the URI does not have an authority it is badly formed.
+   */
+  private String getAccountFromAuthority(URI uri) throws URISyntaxException {
+
+    // Check to make sure that the authority is valid for the URI.
+    //
+    String authority = uri.getRawAuthority();
+    if (null == authority) {
+      // Badly formed or illegal URI.
+      //
+      throw new URISyntaxException(uri.toString(),
+          "Expected URI with a valid authority");
+    }
+
+    // Check if authority container the delimiter separating the account name
+    // from the
+    // the container.
+    //
+    if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
+      return authority;
+    }
+
+    // Split off the container name and the authority.
+    //
+    String[] authorityParts = authority.split(WASB_AUTHORITY_DELIMITER, 2);
+
+    // Because the string contains an '@' delimiter, a container must be
+    // specified.
+    //
+    if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
+      // Badly formed WASB authority since there is no container.
+      //
+      final String errMsg = String
+          .format(
+              "URI '%s' has a malformed WASB authority, expected container 
name. "
+                  + "Authority takes the form wasb://[<container 
name>@]<account name>",
+              uri.toString());
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    // Return with the account name. It is possible that this name is NULL.
+    //
+    return authorityParts[1];
+  }
+
+  /**
+   * Method to extract the container name from an Azure URI.
+   * 
+   * @param uri
+   *          -- WASB blob URI
+   * @returns containerName -- the container name for the URI. May be null.
+   * @throws URISyntaxException
+   *           if the uri does not have an authority it is badly formed.
+   */
+  private String getContainerFromAuthority(URI uri) throws URISyntaxException {
+
+    // Check to make sure that the authority is valid for the URI.
+    //
+    String authority = uri.getRawAuthority();
+    if (null == authority) {
+      // Badly formed or illegal URI.
+      //
+      throw new URISyntaxException(uri.toString(),
+          "Expected URI with a valid authority");
+    }
+
+    // The URI has a valid authority. Extract the container name. It is the
+    // second component of the WASB URI authority.
+    if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
+      // The authority does not have a container name. Use the default 
container
+      // by
+      // setting the container name to the default Azure root container.
+      //
+      return AZURE_ROOT_CONTAINER;
+    }
+
+    // Split off the container name and the authority.
+    String[] authorityParts = authority.split(WASB_AUTHORITY_DELIMITER, 2);
+
+    // Because the string contains an '@' delimiter, a container must be
+    // specified.
+    if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
+      // Badly formed WASB authority since there is no container.
+      final String errMsg = String
+          .format(
+              "URI '%s' has a malformed WASB authority, expected container 
name."
+                  + "Authority takes the form wasb://[<container 
name>@]<account name>",
+              uri.toString());
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    // Set the container name from the first entry for the split parts of the
+    // authority.
+    return authorityParts[0];
+  }
+
+  /**
+   * Get the appropriate return the appropriate scheme for communicating with
+   * Azure depending on whether wasb or wasbs is specified in the target URI.
+   * 
+   * return scheme - HTTPS or HTTP as appropriate.
+   */
+  private String getHTTPScheme() {
+    String sessionScheme = sessionUri.getScheme();
+    // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
+    if (sessionScheme != null
+        && (sessionScheme.equalsIgnoreCase("asvs") || sessionScheme
+            .equalsIgnoreCase("wasbs"))) {
+      return HTTPS_SCHEME;
+    } else {
+      // At this point the scheme should be either null or asv or wasb.
+      // Intentionally I'm not going to validate it though since I don't feel
+      // it's this method's job to ensure a valid URI scheme for this file
+      // system.
+      return HTTP_SCHEME;
+    }
+  }
+
+  /**
+   * Set the configuration parameters for this client storage session with
+   * Azure.
+   * 
+   * @throws AzureException
+   * @throws ConfigurationException
+   * 
+   */
+  private void configureAzureStorageSession() throws AzureException {
+
+    // Assertion: Target session URI already should have been captured.
+    if (sessionUri == null) {
+      throw new AssertionError(
+          "Expected a non-null session URI when configuring storage session");
+    }
+
+    // Assertion: A client session already should have been established with
+    // Azure.
+    if (storageInteractionLayer == null) {
+      throw new AssertionError(String.format(
+          "Cannot configure storage session for URI '%s' "
+              + "if storage session has not been established.",
+          sessionUri.toString()));
+    }
+
+    // Determine whether or not reads are allowed concurrent with OOB writes.
+    tolerateOobAppends = sessionConfiguration.getBoolean(
+        KEY_READ_TOLERATE_CONCURRENT_APPEND,
+        DEFAULT_READ_TOLERATE_CONCURRENT_APPEND);
+
+    // Retrieve configuration for the minimum stream read and write block size.
+    //
+    this.downloadBlockSizeBytes = sessionConfiguration.getInt(
+        KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
+    this.uploadBlockSizeBytes = sessionConfiguration.getInt(
+        KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
+
+    // The job may want to specify a timeout to use when engaging the
+    // storage service. The default is currently 90 seconds. It may
+    // be necessary to increase this value for long latencies in larger
+    // jobs. If the timeout specified is greater than zero seconds use
+    // it, otherwise use the default service client timeout.
+    int storageConnectionTimeout = sessionConfiguration.getInt(
+        KEY_STORAGE_CONNECTION_TIMEOUT, 0);
+
+    if (0 < storageConnectionTimeout) {
+      storageInteractionLayer.setTimeoutInMs(storageConnectionTimeout * 1000);
+    }
+
+    // Set the concurrency values equal to the that specified in the
+    // configuration file. If it does not exist, set it to the default
+    // value calculated as double the number of CPU cores on the client
+    // machine. The concurrency value is minimum of double the cores and
+    // the read/write property.
+    int cpuCores = 2 * Runtime.getRuntime().availableProcessors();
+
+    concurrentWrites = sessionConfiguration.getInt(
+        KEY_CONCURRENT_CONNECTION_VALUE_OUT,
+        Math.min(cpuCores, DEFAULT_CONCURRENT_WRITES));
+
+    // Set up the exponential retry policy.
+    minBackoff = sessionConfiguration.getInt(KEY_MIN_BACKOFF_INTERVAL,
+        DEFAULT_MIN_BACKOFF_INTERVAL);
+
+    maxBackoff = sessionConfiguration.getInt(KEY_MAX_BACKOFF_INTERVAL,
+        DEFAULT_MAX_BACKOFF_INTERVAL);
+
+    deltaBackoff = sessionConfiguration.getInt(KEY_BACKOFF_INTERVAL,
+        DEFAULT_BACKOFF_INTERVAL);
+
+    maxRetries = sessionConfiguration.getInt(KEY_MAX_IO_RETRIES,
+        DEFAULT_MAX_RETRY_ATTEMPTS);
+
+    storageInteractionLayer.setRetryPolicyFactory(new RetryExponentialRetry(
+        minBackoff, deltaBackoff, maxBackoff, maxRetries));
+
+    // read the self-throttling config.
+    selfThrottlingEnabled = sessionConfiguration.getBoolean(
+        KEY_SELF_THROTTLE_ENABLE, DEFAULT_SELF_THROTTLE_ENABLE);
+
+    selfThrottlingReadFactor = sessionConfiguration.getFloat(
+        KEY_SELF_THROTTLE_READ_FACTOR, DEFAULT_SELF_THROTTLE_READ_FACTOR);
+
+    selfThrottlingWriteFactor = sessionConfiguration.getFloat(
+        KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String
+          .format(
+              "AzureNativeFileSystemStore init. 
Settings=%d,%b,%d,{%d,%d,%d,%d},{%b,%f,%f}",
+              concurrentWrites, tolerateOobAppends,
+              ((storageConnectionTimeout > 0) ? storageConnectionTimeout
+                  : STORAGE_CONNECTION_TIMEOUT_DEFAULT), minBackoff,
+              deltaBackoff, maxBackoff, maxRetries, selfThrottlingEnabled,
+              selfThrottlingReadFactor, selfThrottlingWriteFactor));
+    }
+  }
+
+  /**
+   * Connect to Azure storage using anonymous credentials.
+   * 
+   * @param uri
+   *          - URI to target blob (R/O access to public blob)
+   * 
+   * @throws StorageException
+   *           raised on errors communicating with Azure storage.
+   * @throws IOException
+   *           raised on errors performing I/O or setting up the session.
+   * @throws URISyntaxExceptions
+   *           raised on creating mal-formed URI's.
+   */
+  private void connectUsingAnonymousCredentials(final URI uri)
+      throws StorageException, IOException, URISyntaxException {
+    // Use an HTTP scheme since the URI specifies a publicly accessible
+    // container. Explicitly create a storage URI corresponding to the URI
+    // parameter for use in creating the service client.
+    String accountName = getAccountFromAuthority(uri);
+    URI storageUri = new URI(getHTTPScheme() + ":" + PATH_DELIMITER
+        + PATH_DELIMITER + accountName);
+
+    // Create the service client with anonymous credentials.
+    String containerName = getContainerFromAuthority(uri);
+    storageInteractionLayer.createBlobClient(storageUri);
+    suppressRetryPolicyInClientIfNeeded();
+
+    // Capture the container reference.
+    container = storageInteractionLayer.getContainerReference(containerName);
+    rootDirectory = container.getDirectoryReference("");
+
+    // Check for container existence, and our ability to access it.
+    try {
+      if (!container.exists(getInstrumentedContext())) {
+        throw new AzureException("Container " + containerName + " in account "
+            + accountName + " not found, and we can't create "
+            + " it using anoynomous credentials.");
+      }
+    } catch (StorageException ex) {
+      throw new AzureException("Unable to access container " + containerName
+          + " in account " + accountName
+          + " using anonymous credentials, and no credentials found for them "
+          + " in the configuration.", ex);
+    }
+
+    // Accessing the storage server unauthenticated using
+    // anonymous credentials.
+    isAnonymousCredentials = true;
+
+    // Configure Azure storage session.
+    configureAzureStorageSession();
+  }
+
+  private void connectUsingCredentials(String accountName,
+      StorageCredentials credentials, String containerName)
+      throws URISyntaxException, StorageException, AzureException {
+
+    if (isStorageEmulatorAccount(accountName)) {
+      isStorageEmulator = true;
+      CloudStorageAccount account = CloudStorageAccount
+          .getDevelopmentStorageAccount();
+      storageInteractionLayer.createBlobClient(account);
+    } else {
+      URI blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
+      storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
+    }
+    suppressRetryPolicyInClientIfNeeded();
+
+    // Capture the container reference for debugging purposes.
+    container = storageInteractionLayer.getContainerReference(containerName);
+    rootDirectory = container.getDirectoryReference("");
+
+    // Can only create container if using account key credentials
+    canCreateOrModifyContainer = credentials instanceof 
StorageCredentialsAccountAndKey;
+
+    // Configure Azure storage session.
+    configureAzureStorageSession();
+  }
+
+  /**
+   * Connect to Azure storage using account key credentials.
+   */
+  private void connectUsingConnectionStringCredentials(
+      final String accountName, final String containerName,
+      final String accountKey) throws InvalidKeyException, StorageException,
+      IOException, URISyntaxException {
+    // If the account name is "acc.blob.core.windows.net", then the
+    // rawAccountName is just "acc"
+    String rawAccountName = accountName.split("\\.")[0];
+    StorageCredentials credentials = new StorageCredentialsAccountAndKey(
+        rawAccountName, accountKey);
+    connectUsingCredentials(accountName, credentials, containerName);
+  }
+
+  /**
+   * Connect to Azure storage using shared access signature credentials.
+   */
+  private void connectUsingSASCredentials(final String accountName,
+      final String containerName, final String sas) throws InvalidKeyException,
+      StorageException, IOException, URISyntaxException {
+    StorageCredentials credentials = new 
StorageCredentialsSharedAccessSignature(
+        sas);
+    connectingUsingSAS = true;
+    connectUsingCredentials(accountName, credentials, containerName);
+  }
+
+  private boolean isStorageEmulatorAccount(final String accountName) {
+    return accountName.equalsIgnoreCase(sessionConfiguration.get(
+        STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME,
+        DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME));
+  }
+
+  static String getAccountKeyFromConfiguration(String accountName,
+      Configuration conf) throws KeyProviderException {
+    String key = null;
+    String keyProviderClass = conf.get(KEY_ACCOUNT_KEYPROVIDER_PREFIX
+        + accountName);
+    KeyProvider keyProvider = null;
+
+    if (keyProviderClass == null) {
+      // No key provider was provided so use the provided key as is.
+      keyProvider = new SimpleKeyProvider();
+    } else {
+      // create an instance of the key provider class and verify it
+      // implements KeyProvider
+      Object keyProviderObject = null;
+      try {
+        Class<?> clazz = conf.getClassByName(keyProviderClass);
+        keyProviderObject = clazz.newInstance();
+      } catch (Exception e) {
+        throw new KeyProviderException("Unable to load key provider class.", 
e);
+      }
+      if (!(keyProviderObject instanceof KeyProvider)) {
+        throw new KeyProviderException(keyProviderClass
+            + " specified in config is not a valid KeyProvider class.");
+      }
+      keyProvider = (KeyProvider) keyProviderObject;
+    }
+    key = keyProvider.getStorageAccountKey(accountName, conf);
+
+    return key;
+  }
+
+  /**
+   * Establish a session with Azure blob storage based on the target URI. The
+   * method determines whether or not the URI target contains an explicit
+   * account or an implicit default cluster-wide account.
+   * 
+   * @throws AzureException
+   * @throws IOException
+   */
+  private void createAzureStorageSession() throws AzureException {
+
+    // Make sure this object was properly initialized with references to
+    // the sessionUri and sessionConfiguration.
+    if (null == sessionUri || null == sessionConfiguration) {
+      throw new AzureException("Filesystem object not initialized properly."
+          + "Unable to start session with Azure Storage server.");
+    }
+
+    // File system object initialized, attempt to establish a session
+    // with the Azure storage service for the target URI string.
+    try {
+      // Inspect the URI authority to determine the account and use the account
+      // to start an Azure blob client session using an account key for the
+      // the account or anonymously.
+      // For all URI's do the following checks in order:
+      // 1. Validate that <account> can be used with the current Hadoop
+      // cluster by checking it exists in the list of configured accounts
+      // for the cluster.
+      // 2. Look up the AccountKey in the list of configured accounts for the
+      // cluster.
+      // 3. If there is no AccountKey, assume anonymous public blob access
+      // when accessing the blob.
+      //
+      // If the URI does not specify a container use the default root container
+      // under the account name.
+
+      // Assertion: Container name on the session Uri should be non-null.
+      if (getContainerFromAuthority(sessionUri) == null) {
+        throw new AssertionError(String.format(
+            "Non-null container expected from session URI: %s.",
+            sessionUri.toString()));
+      }
+
+      // Get the account name.
+      String accountName = getAccountFromAuthority(sessionUri);
+      if (null == accountName) {
+        // Account name is not specified as part of the URI. Throw indicating
+        // an invalid account name.
+        final String errMsg = String.format(
+            "Cannot load WASB file system account name not"
+                + " specified in URI: %s.", sessionUri.toString());
+        throw new AzureException(errMsg);
+      }
+
+      String containerName = getContainerFromAuthority(sessionUri);
+
+      // Check whether this is a storage emulator account.
+      if (isStorageEmulatorAccount(accountName)) {
+        // It is an emulator account, connect to it with no credentials.
+        connectUsingCredentials(accountName, null, containerName);
+        return;
+      }
+
+      // Check whether we have a shared access signature for that container.
+      String propertyValue = sessionConfiguration.get(KEY_ACCOUNT_SAS_PREFIX
+          + containerName + "." + accountName);
+      if (propertyValue != null) {
+        // SAS was found. Connect using that.
+        connectUsingSASCredentials(accountName, containerName, propertyValue);
+        return;
+      }
+
+      // Check whether the account is configured with an account key.
+      propertyValue = getAccountKeyFromConfiguration(accountName,
+          sessionConfiguration);
+      if (propertyValue != null) {
+
+        // Account key was found.
+        // Create the Azure storage session using the account key and 
container.
+        connectUsingConnectionStringCredentials(
+            getAccountFromAuthority(sessionUri),
+            getContainerFromAuthority(sessionUri), propertyValue);
+
+        // Return to caller
+        return;
+      }
+
+      // The account access is not configured for this cluster. Try anonymous
+      // access.
+      connectUsingAnonymousCredentials(sessionUri);
+
+    } catch (Exception e) {
+      // Caught exception while attempting to initialize the Azure File
+      // System store, re-throw the exception.
+      throw new AzureException(e);
+    }
+  }
+
+  private enum ContainerState {
+    /**
+     * We haven't checked the container state yet.
+     */
+    Unknown,
+    /**
+     * We checked and the container doesn't exist.
+     */
+    DoesntExist,
+    /**
+     * The container exists and doesn't have an WASB version stamp on it.
+     */
+    ExistsNoVersion,
+    /**
+     * The container exists and has an unsupported WASB version stamped on it.
+     */
+    ExistsAtWrongVersion,
+    /**
+     * The container exists and has the proper WASB version stamped on it.
+     */
+    ExistsAtRightVersion
+  }
+
+  private enum ContainerAccessType {
+    /**
+     * We're accessing the container for a pure read operation, e.g. read a
+     * file.
+     */
+    PureRead,
+    /**
+     * We're accessing the container purely to write something, e.g. write a
+     * file.
+     */
+    PureWrite,
+    /**
+     * We're accessing the container to read something then write, e.g. rename 
a
+     * file.
+     */
+    ReadThenWrite
+  }
+
+  /**
+   * This should be called from any method that does any modifications to the
+   * underlying container: it makes sure to put the WASB current version in the
+   * container's metadata if it's not already there.
+   */
+  private ContainerState checkContainer(ContainerAccessType accessType)
+      throws StorageException, AzureException {
+    synchronized (containerStateLock) {
+      if (isOkContainerState(accessType)) {
+        return currentKnownContainerState;
+      }
+      if (currentKnownContainerState == ContainerState.ExistsAtWrongVersion) {
+        String containerVersion = retrieveVersionAttribute(container);
+        throw wrongVersionException(containerVersion);
+      }
+      // This means I didn't check it before or it didn't exist or
+      // we need to stamp the version. Since things may have changed by
+      // other machines since then, do the check again and don't depend
+      // on past information.
+
+      // Sanity check: we don't expect this at this point.
+      if (currentKnownContainerState == ContainerState.ExistsAtRightVersion) {
+        throw new AssertionError("Unexpected state: "
+            + currentKnownContainerState);
+      }
+
+      // Download the attributes - doubles as an existence check with just
+      // one service call
+      try {
+        container.downloadAttributes(getInstrumentedContext());
+        currentKnownContainerState = ContainerState.Unknown;
+      } catch (StorageException ex) {
+        if (ex.getErrorCode().equals(
+            StorageErrorCode.RESOURCE_NOT_FOUND.toString())) {
+          currentKnownContainerState = ContainerState.DoesntExist;
+        } else {
+          throw ex;
+        }
+      }
+
+      if (currentKnownContainerState == ContainerState.DoesntExist) {
+        // If the container doesn't exist and we intend to write to it,
+        // create it now.
+        if (needToCreateContainer(accessType)) {
+          storeVersionAttribute(container);
+          container.create(getInstrumentedContext());
+          currentKnownContainerState = ContainerState.ExistsAtRightVersion;
+        }
+      } else {
+        // The container exists, check the version.
+        String containerVersion = retrieveVersionAttribute(container);
+        if (containerVersion != null) {
+          if (containerVersion.equals(FIRST_WASB_VERSION)) {
+            // It's the version from when WASB was called ASV, just
+            // fix the version attribute if needed and proceed.
+            // We should be good otherwise.
+            if (needToStampVersion(accessType)) {
+              storeVersionAttribute(container);
+              container.uploadMetadata(getInstrumentedContext());
+            }
+          } else if (!containerVersion.equals(CURRENT_WASB_VERSION)) {
+            // Don't know this version - throw.
+            currentKnownContainerState = ContainerState.ExistsAtWrongVersion;
+            throw wrongVersionException(containerVersion);
+          } else {
+            // It's our correct version.
+            currentKnownContainerState = ContainerState.ExistsAtRightVersion;
+          }
+        } else {
+          // No version info exists.
+          currentKnownContainerState = ContainerState.ExistsNoVersion;
+          if (needToStampVersion(accessType)) {
+            // Need to stamp the version
+            storeVersionAttribute(container);
+            container.uploadMetadata(getInstrumentedContext());
+            currentKnownContainerState = ContainerState.ExistsAtRightVersion;
+          }
+        }
+      }
+      return currentKnownContainerState;
+    }
+  }
+
+  private AzureException wrongVersionException(String containerVersion) {
+    return new AzureException("The container " + container.getName()
+        + " is at an unsupported version: " + containerVersion
+        + ". Current supported version: " + FIRST_WASB_VERSION);
+  }
+
+  private boolean needToStampVersion(ContainerAccessType accessType) {
+    // We need to stamp the version on the container any time we write to
+    // it and we have the correct credentials to be able to write container
+    // metadata.
+    return accessType != ContainerAccessType.PureRead
+        && canCreateOrModifyContainer;
+  }
+
+  private static boolean needToCreateContainer(ContainerAccessType accessType) 
{
+    // We need to pro-actively create the container (if it doesn't exist) if
+    // we're doing a pure write. No need to create it for pure read or read-
+    // then-write access.
+    return accessType == ContainerAccessType.PureWrite;
+  }
+
+  // Determines whether we have to pull the container information again
+  // or we can work based off what we already have.
+  private boolean isOkContainerState(ContainerAccessType accessType) {
+    switch (currentKnownContainerState) {
+    case Unknown:
+      // When using SAS, we can't discover container attributes
+      // so just live with Unknown state and fail later if it
+      // doesn't exist.
+      return connectingUsingSAS;
+    case DoesntExist:
+      return false; // the container could have been created
+    case ExistsAtRightVersion:
+      return true; // fine to optimize
+    case ExistsAtWrongVersion:
+      return false;
+    case ExistsNoVersion:
+      // If there's no version, it's OK if we don't need to stamp the version
+      // or we can't anyway even if we wanted to.
+      return !needToStampVersion(accessType);
+    default:
+      throw new AssertionError("Unknown access type: " + accessType);
+    }
+  }
+
+  private boolean getUseTransactionalContentMD5() {
+    return sessionConfiguration.getBoolean(KEY_CHECK_BLOCK_MD5, true);
+  }
+
+  private BlobRequestOptions getUploadOptions() {
+    BlobRequestOptions options = new BlobRequestOptions();
+    options.setStoreBlobContentMD5(sessionConfiguration.getBoolean(
+        KEY_STORE_BLOB_MD5, false));
+    options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
+    options.setConcurrentRequestCount(concurrentWrites);
+
+    options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
+        deltaBackoff, maxBackoff, maxRetries));
+
+    return options;
+  }
+
+  private BlobRequestOptions getDownloadOptions() {
+    BlobRequestOptions options = new BlobRequestOptions();
+    options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
+        deltaBackoff, maxBackoff, maxRetries));
+    options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
+    return options;
+  }
+
+  @Override
+  public DataOutputStream storefile(String key,
+      PermissionStatus permissionStatus) throws AzureException {
+    try {
+
+      // Check if a session exists, if not create a session with the
+      // Azure storage server.
+      if (null == storageInteractionLayer) {
+        final String errMsg = String.format(
+            "Storage session expected for URI '%s' but does not exist.",
+            sessionUri);
+        throw new AzureException(errMsg);
+      }
+
+      // Check if there is an authenticated account associated with the
+      // file this instance of the WASB file system. If not the file system
+      // has not been authenticated and all access is anonymous.
+      if (!isAuthenticatedAccess()) {
+        // Preemptively raise an exception indicating no uploads are
+        // allowed to anonymous accounts.
+        throw new AzureException(new IOException(
+            "Uploads to public accounts using anonymous "
+                + "access is prohibited."));
+      }
+
+      checkContainer(ContainerAccessType.PureWrite);
+
+      /**
+       * Note: Windows Azure Blob Storage does not allow the creation of
+       * arbitrary directory paths under the default $root directory. This is 
by
+       * design to eliminate ambiguity in specifying a implicit blob address. A
+       * blob in the $root container cannot include a / in its name and must be
+       * careful not to include a trailing '/' when referencing blobs in the
+       * $root container. A '/; in the $root container permits ambiguous blob
+       * names as in the following example involving two containers $root and
+       * mycontainer: http://myaccount.blob.core.windows.net/$root
+       * http://myaccount.blob.core.windows.net/mycontainer If the URL
+       * "mycontainer/somefile.txt were allowed in $root then the URL:
+       * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt could
+       * mean either: (1) container=mycontainer; blob=myblob.txt (2)
+       * container=$root; blob=mycontainer/myblob.txt
+       * 
+       * To avoid this type of ambiguity the Azure blob storage prevents
+       * arbitrary path under $root. For a simple and more consistent user
+       * experience it was decided to eliminate the opportunity for creating
+       * such paths by making the $root container read-only under WASB. 
+       */
+
+      // Check that no attempt is made to write to blobs on default
+      // $root containers.
+      if (AZURE_ROOT_CONTAINER.equals(getContainerFromAuthority(sessionUri))) {
+        // Azure containers are restricted to non-root containers.
+        final String errMsg = String.format(
+            "Writes to '%s' container for URI '%s' are prohibited, "
+                + "only updates on non-root containers permitted.",
+            AZURE_ROOT_CONTAINER, sessionUri.toString());
+        throw new AzureException(errMsg);
+      }
+
+      // Get the block blob reference from the store's container and
+      // return it.
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      storePermissionStatus(blob, permissionStatus);
+
+      // Create the output stream for the Azure blob.
+      OutputStream outputStream = blob.openOutputStream(getUploadOptions(),
+          getInstrumentedContext());
+
+      // Return to caller with DataOutput stream.
+      DataOutputStream dataOutStream = new DataOutputStream(outputStream);
+      return dataOutStream;
+    } catch (Exception e) {
+      // Caught exception while attempting to open the blob output stream.
+      // Re-throw as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Default permission to use when no permission metadata is found.
+   * 
+   * @return The default permission to use.
+   */
+  private static PermissionStatus defaultPermissionNoBlobMetadata() {
+    return new PermissionStatus("", "", FsPermission.getDefault());
+  }
+
+  private static void storeMetadataAttribute(CloudBlockBlobWrapper blob,
+      String key, String value) {
+    HashMap<String, String> metadata = blob.getMetadata();
+    if (null == metadata) {
+      metadata = new HashMap<String, String>();
+    }
+    metadata.put(key, value);
+    blob.setMetadata(metadata);
+  }
+
+  private static String getMetadataAttribute(CloudBlockBlobWrapper blob,
+      String... keyAlternatives) {
+    HashMap<String, String> metadata = blob.getMetadata();
+    if (null == metadata) {
+      return null;
+    }
+    for (String key : keyAlternatives) {
+      if (metadata.containsKey(key)) {
+        return metadata.get(key);
+      }
+    }
+    return null;
+  }
+
+  private static void removeMetadataAttribute(CloudBlockBlobWrapper blob,
+      String key) {
+    HashMap<String, String> metadata = blob.getMetadata();
+    if (metadata != null) {
+      metadata.remove(key);
+      blob.setMetadata(metadata);
+    }
+  }
+
+  private void storePermissionStatus(CloudBlockBlobWrapper blob,
+      PermissionStatus permissionStatus) {
+    storeMetadataAttribute(blob, PERMISSION_METADATA_KEY,
+        PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus));
+    // Remove the old metadata key if present
+    removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY);
+  }
+
+  private PermissionStatus getPermissionStatus(CloudBlockBlobWrapper blob) {
+    String permissionMetadataValue = getMetadataAttribute(blob,
+        PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY);
+    if (permissionMetadataValue != null) {
+      return PermissionStatusJsonSerializer
+          .fromJSONString(permissionMetadataValue);
+    } else {
+      return defaultPermissionNoBlobMetadata();
+    }
+  }
+
+  private static void storeFolderAttribute(CloudBlockBlobWrapper blob) {
+    storeMetadataAttribute(blob, IS_FOLDER_METADATA_KEY, "true");
+    // Remove the old metadata key if present
+    removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
+  }
+
+  private static void storeLinkAttribute(CloudBlockBlobWrapper blob,
+      String linkTarget) {
+    storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
+        linkTarget);
+    // Remove the old metadata key if present
+    removeMetadataAttribute(blob,
+        OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
+  }
+
+  private static String getLinkAttributeValue(CloudBlockBlobWrapper blob) {
+    return getMetadataAttribute(blob,
+        LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
+        OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
+  }
+
+  private static boolean retrieveFolderAttribute(CloudBlockBlobWrapper blob) {
+    HashMap<String, String> metadata = blob.getMetadata();
+    return null != metadata
+        && (metadata.containsKey(IS_FOLDER_METADATA_KEY) || metadata
+            .containsKey(OLD_IS_FOLDER_METADATA_KEY));
+  }
+
+  private static void storeVersionAttribute(CloudBlobContainerWrapper 
container) {
+    HashMap<String, String> metadata = container.getMetadata();
+    if (null == metadata) {
+      metadata = new HashMap<String, String>();
+    }
+    metadata.put(VERSION_METADATA_KEY, CURRENT_WASB_VERSION);
+    if (metadata.containsKey(OLD_VERSION_METADATA_KEY)) {
+      metadata.remove(OLD_VERSION_METADATA_KEY);
+    }
+    container.setMetadata(metadata);
+  }
+
+  private static String retrieveVersionAttribute(
+      CloudBlobContainerWrapper container) {
+    HashMap<String, String> metadata = container.getMetadata();
+    if (metadata == null) {
+      return null;
+    } else if (metadata.containsKey(VERSION_METADATA_KEY)) {
+      return metadata.get(VERSION_METADATA_KEY);
+    } else if (metadata.containsKey(OLD_VERSION_METADATA_KEY)) {
+      return metadata.get(OLD_VERSION_METADATA_KEY);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void storeEmptyFolder(String key, PermissionStatus permissionStatus)
+      throws AzureException {
+
+    if (null == storageInteractionLayer) {
+      final String errMsg = String.format(
+          "Storage session expected for URI '%s' but does not exist.",
+          sessionUri);
+      throw new AssertionError(errMsg);
+    }
+
+    // Check if there is an authenticated account associated with the file
+    // this instance of the WASB file system. If not the file system has not
+    // been authenticated and all access is anonymous.
+    if (!isAuthenticatedAccess()) {
+      // Preemptively raise an exception indicating no uploads are
+      // allowed to anonymous accounts.
+      throw new AzureException(
+          "Uploads to to public accounts using anonymous access is 
prohibited.");
+    }
+
+    try {
+      checkContainer(ContainerAccessType.PureWrite);
+
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      storePermissionStatus(blob, permissionStatus);
+      storeFolderAttribute(blob);
+      blob.upload(new ByteArrayInputStream(new byte[0]),
+          getInstrumentedContext());
+    } catch (Exception e) {
+      // Caught exception while attempting upload. Re-throw as an Azure
+      // storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Stores an empty blob that's linking to the temporary file where're we're
+   * uploading the initial data.
+   */
+  @Override
+  public void storeEmptyLinkFile(String key, String tempBlobKey,
+      PermissionStatus permissionStatus) throws AzureException {
+    if (null == storageInteractionLayer) {
+      final String errMsg = String.format(
+          "Storage session expected for URI '%s' but does not exist.",
+          sessionUri);
+      throw new AssertionError(errMsg);
+    }
+    // Check if there is an authenticated account associated with the file
+    // this instance of the WASB file system. If not the file system has not
+    // been authenticated and all access is anonymous.
+    if (!isAuthenticatedAccess()) {
+      // Preemptively raise an exception indicating no uploads are
+      // allowed to anonymous accounts.
+      throw new AzureException(
+          "Uploads to to public accounts using anonymous access is 
prohibited.");
+    }
+
+    try {
+      checkContainer(ContainerAccessType.PureWrite);
+
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      storePermissionStatus(blob, permissionStatus);
+      storeLinkAttribute(blob, tempBlobKey);
+      blob.upload(new ByteArrayInputStream(new byte[0]),
+          getInstrumentedContext());
+    } catch (Exception e) {
+      // Caught exception while attempting upload. Re-throw as an Azure
+      // storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * If the blob with the given key exists and has a link in its metadata to a
+   * temporary file (see storeEmptyLinkFile), this method returns the key to
+   * that temporary file. Otherwise, returns null.
+   */
+  @Override
+  public String getLinkInFileMetadata(String key) throws AzureException {
+    if (null == storageInteractionLayer) {
+      final String errMsg = String.format(
+          "Storage session expected for URI '%s' but does not exist.",
+          sessionUri);
+      throw new AssertionError(errMsg);
+    }
+
+    try {
+      checkContainer(ContainerAccessType.PureRead);
+
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      blob.downloadAttributes(getInstrumentedContext());
+      return getLinkAttributeValue(blob);
+    } catch (Exception e) {
+      // Caught exception while attempting download. Re-throw as an Azure
+      // storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Private method to check for authenticated access.
+   * 
+   * @ returns boolean -- true if access is credentialed and authenticated and
+   * false otherwise.
+   */
+  private boolean isAuthenticatedAccess() throws AzureException {
+
+    if (isAnonymousCredentials) {
+      // Access to this storage account is unauthenticated.
+      return false;
+    }
+    // Access is authenticated.
+    return true;
+  }
+
+  /**
+   * This private method uses the root directory or the original container to
+   * list blobs under the directory or container depending on whether the
+   * original file system object was constructed with a short- or long-form 
URI.
+   * If the root directory is non-null the URI in the file constructor was in
+   * the long form.
+   * 
+   * @param includeMetadata
+   *          if set, the listed items will have their metadata populated
+   *          already.
+   * 
+   * @returns blobItems : iterable collection of blob items.
+   * @throws URISyntaxException
+   * 
+   */
+  private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata)
+      throws StorageException, URISyntaxException {
+    return rootDirectory.listBlobs(
+        null,
+        false,
+        includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
+            .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+  }
+
+  /**
+   * This private method uses the root directory or the original container to
+   * list blobs under the directory or container given a specified prefix for
+   * the directory depending on whether the original file system object was
+   * constructed with a short- or long-form URI. If the root directory is
+   * non-null the URI in the file constructor was in the long form.
+   * 
+   * @param aPrefix
+   *          : string name representing the prefix of containing blobs.
+   * @param includeMetadata
+   *          if set, the listed items will have their metadata populated
+   *          already.
+   * 
+   * @returns blobItems : iterable collection of blob items.
+   * @throws URISyntaxException
+   * 
+   */
+  private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
+      boolean includeMetadata) throws StorageException, URISyntaxException {
+
+    return rootDirectory.listBlobs(
+        aPrefix,
+        false,
+        includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
+            .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+  }
+
+  /**
+   * This private method uses the root directory or the original container to
+   * list blobs under the directory or container given a specified prefix for
+   * the directory depending on whether the original file system object was
+   * constructed with a short- or long-form URI. It also uses the specified 
flat
+   * or hierarchical option, listing details options, request options, and
+   * operation context.
+   * 
+   * @param aPrefix
+   *          string name representing the prefix of containing blobs.
+   * @param useFlatBlobListing
+   *          - the list is flat if true, or hierarchical otherwise.
+   * @param listingDetails
+   *          - determine whether snapshots, metadata, committed/uncommitted
+   *          data
+   * @param options
+   *          - object specifying additional options for the request. null =
+   *          default options
+   * @param opContext
+   *          - context of the current operation
+   * @returns blobItems : iterable collection of blob items.
+   * @throws URISyntaxException
+   * 
+   */
+  private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
+      boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
+      BlobRequestOptions options, OperationContext opContext)
+      throws StorageException, URISyntaxException {
+
+    CloudBlobDirectoryWrapper directory = this.container
+        .getDirectoryReference(aPrefix);
+    return directory.listBlobs(null, useFlatBlobListing, listingDetails,
+        options, opContext);
+  }
+
+  /**
+   * This private method uses the root directory or the original container to
+   * get the block blob reference depending on whether the original file system
+   * object was constructed with a short- or long-form URI. If the root
+   * directory is non-null the URI in the file constructor was in the long 
form.
+   * 
+   * @param aKey
+   *          : a key used to query Azure for the block blob.
+   * @returns blob : a reference to the Azure block blob corresponding to the
+   *          key.
+   * @throws URISyntaxException
+   * 
+   */
+  private CloudBlockBlobWrapper getBlobReference(String aKey)
+      throws StorageException, URISyntaxException {
+
+    CloudBlockBlobWrapper blob = this.container.getBlockBlobReference(aKey);
+
+    blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes);
+    blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes);
+
+    // Return with block blob.
+    return blob;
+  }
+
+  /**
+   * This private method normalizes the key by stripping the container name 
from
+   * the path and returns a path relative to the root directory of the
+   * container.
+   * 
+   * @param keyUri
+   *          - adjust this key to a path relative to the root directory
+   * 
+   * @returns normKey
+   */
+  private String normalizeKey(URI keyUri) {
+    String normKey;
+
+    // Strip the container name from the path and return the path
+    // relative to the root directory of the container.
+    int parts = isStorageEmulator ? 4 : 3;
+    normKey = keyUri.getPath().split("/", parts)[(parts - 1)];
+
+    // Return the fixed key.
+    return normKey;
+  }
+
+  /**
+   * This private method normalizes the key by stripping the container name 
from
+   * the path and returns a path relative to the root directory of the
+   * container.
+   * 
+   * @param blob
+   *          - adjust the key to this blob to a path relative to the root
+   *          directory
+   * 
+   * @returns normKey
+   */
+  private String normalizeKey(CloudBlockBlobWrapper blob) {
+    return normalizeKey(blob.getUri());
+  }
+
+  /**
+   * This private method normalizes the key by stripping the container name 
from
+   * the path and returns a path relative to the root directory of the
+   * container.
+   * 
+   * @param blob
+   *          - adjust the key to this directory to a path relative to the root
+   *          directory
+   * 
+   * @returns normKey
+   */
+  private String normalizeKey(CloudBlobDirectoryWrapper directory) {
+    String dirKey = normalizeKey(directory.getUri());
+    // Strip the last delimiter
+    if (dirKey.endsWith(PATH_DELIMITER)) {
+      dirKey = dirKey.substring(0, dirKey.length() - 1);
+    }
+    return dirKey;
+  }
+
+  /**
+   * Default method to creates a new OperationContext for the Azure Storage
+   * operation that has listeners hooked to it that will update the metrics for
+   * this file system. This method does not bind to receive send request
+   * callbacks by default.
+   * 
+   * @return The OperationContext object to use.
+   */
+  private OperationContext getInstrumentedContext() {
+    // Default is to not bind to receive send callback events.
+    return getInstrumentedContext(false);
+  }
+
+  /**
+   * Creates a new OperationContext for the Azure Storage operation that has
+   * listeners hooked to it that will update the metrics for this file system.
+   * 
+   * @param bindConcurrentOOBIo
+   *          - bind to intercept send request call backs to handle OOB I/O.
+   * 
+   * @return The OperationContext object to use.
+   */
+  private OperationContext getInstrumentedContext(boolean bindConcurrentOOBIo) 
{
+
+    OperationContext operationContext = new OperationContext();
+
+    if (selfThrottlingEnabled) {
+      SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
+          selfThrottlingWriteFactor);
+    }
+
+    // Bind operation context to receive send request callbacks on this
+    // operation.
+    // If reads concurrent to OOB writes are allowed, the interception will
+    // reset the conditional header on all Azure blob storage read requests.
+    if (bindConcurrentOOBIo) {
+      SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
+          operationContext, true);
+    }
+
+    if (testHookOperationContext != null) {
+      operationContext = testHookOperationContext
+          .modifyOperationContext(operationContext);
+    }
+
+    // Return the operation context.
+    return operationContext;
+  }
+
+  @Override
+  public FileMetadata retrieveMetadata(String key) throws IOException {
+
+    // Attempts to check status may occur before opening any streams so first,
+    // check if a session exists, if not create a session with the Azure 
storage
+    // server.
+    if (null == storageInteractionLayer) {
+      final String errMsg = String.format(
+          "Storage session expected for URI '%s' but does not exist.",
+          sessionUri);
+      throw new AssertionError(errMsg);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrieving metadata for " + key);
+    }
+
+    try {
+      if (checkContainer(ContainerAccessType.PureRead) == 
ContainerState.DoesntExist) {
+        // The container doesn't exist, so spare some service calls and just
+        // return null now.
+        return null;
+      }
+
+      // Handle the degenerate cases where the key does not exist or the
+      // key is a container.
+      if (key.equals("/")) {
+        // The key refers to root directory of container.
+        // Set the modification time for root to zero.
+        return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
+            BlobMaterialization.Implicit);
+      }
+
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+
+      // Download attributes and return file metadata only if the blob
+      // exists.
+      if (null != blob && blob.exists(getInstrumentedContext())) {
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found " + key
+              + " as an explicit blob. Checking if it's a file or folder.");
+        }
+
+        // The blob exists, so capture the metadata from the blob
+        // properties.
+        blob.downloadAttributes(getInstrumentedContext());
+        BlobProperties properties = blob.getProperties();
+
+        if (retrieveFolderAttribute(blob)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(key + " is a folder blob.");
+          }
+          return new FileMetadata(key, properties.getLastModified().getTime(),
+              getPermissionStatus(blob), BlobMaterialization.Explicit);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(key + " is a normal blob.");
+          }
+
+          return new FileMetadata(
+              key, // Always return denormalized key with metadata.
+              properties.getLength(), properties.getLastModified().getTime(),
+              getPermissionStatus(blob));
+        }
+      }
+
+      // There is no file with that key name, but maybe it is a folder.
+      // Query the underlying folder/container to list the blobs stored
+      // there under that key.
+      Iterable<ListBlobItem> objects = listRootBlobs(key, true,
+          EnumSet.of(BlobListingDetails.METADATA), null,
+          getInstrumentedContext());
+
+      // Check if the directory/container has the blob items.
+      for (ListBlobItem blobItem : objects) {
+        if (blobItem instanceof CloudBlockBlobWrapper) {
+          LOG.debug("Found blob as a directory-using this file under it to 
infer its properties "
+              + blobItem.getUri());
+
+          blob = (CloudBlockBlobWrapper) blobItem;
+          // The key specifies a directory. Create a FileMetadata object which
+          // specifies as such.
+          BlobProperties properties = blob.getProperties();
+
+          return new FileMetadata(key, properties.getLastModified().getTime(),
+              getPermissionStatus(blob), BlobMaterialization.Implicit);
+        }
+      }
+
+      // Return to caller with a null metadata object.
+      return null;
+
+    } catch (Exception e) {
+      // Re-throw the exception as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public DataInputStream retrieve(String key) throws AzureException {
+    InputStream inStream = null;
+    BufferedInputStream inBufStream = null;
+    try {
+      try {
+        // Check if a session exists, if not create a session with the
+        // Azure storage server.
+        if (null == storageInteractionLayer) {
+          final String errMsg = String.format(
+              "Storage session expected for URI '%s' but does not exist.",
+              sessionUri);
+          throw new AssertionError(errMsg);
+        }
+        checkContainer(ContainerAccessType.PureRead);
+
+        // Get blob reference and open the input buffer stream.
+        CloudBlockBlobWrapper blob = getBlobReference(key);
+        inStream = blob.openInputStream(getDownloadOptions(),
+            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+
+        inBufStream = new BufferedInputStream(inStream);
+
+        // Return a data input stream.
+        DataInputStream inDataStream = new DataInputStream(inBufStream);
+        return inDataStream;
+      }
+      catch (Exception e){
+        // close the streams on error.
+        // We use nested try-catch as stream.close() can throw IOException.
+        if(inBufStream != null){
+          inBufStream.close();
+        }
+        if(inStream != null){
+          inStream.close();
+        }
+        throw e;
+      }
+    } catch (Exception e) {
+      // Re-throw as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public DataInputStream retrieve(String key, long startByteOffset)
+      throws AzureException {
+
+    InputStream in = null;
+    DataInputStream inDataStream = null;
+    try {
+      try {
+        // Check if a session exists, if not create a session with the
+        // Azure storage server.
+        if (null == storageInteractionLayer) {
+          final String errMsg = String.format(
+              "Storage session expected for URI '%s' but does not exist.",
+              sessionUri);
+          throw new AssertionError(errMsg);
+        }
+        checkContainer(ContainerAccessType.PureRead);
+
+        // Get blob reference and open the input buffer stream.
+        CloudBlockBlobWrapper blob = getBlobReference(key);
+
+        // Open input stream and seek to the start offset.
+        in = blob.openInputStream(getDownloadOptions(),
+            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+
+        // Create a data input stream.
+        inDataStream = new DataInputStream(in);
+        long skippedBytes = inDataStream.skip(startByteOffset);
+        if (skippedBytes != startByteOffset) {
+          throw new IOException("Couldn't skip the requested number of bytes");
+        }
+        return inDataStream;
+      }
+      catch (Exception e){
+        // close the streams on error.
+        // We use nested try-catch as stream.close() can throw IOException.
+        if(inDataStream != null){
+          inDataStream.close();
+        }
+        if(in != null){
+          inDataStream.close();
+        }
+        throw e;
+      }
+    } catch (Exception e) {
+      // Re-throw as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public PartialListing list(String prefix, final int maxListingCount,
+      final int maxListingDepth) throws IOException {
+    return list(prefix, maxListingCount, maxListingDepth, null);
+  }
+
+  @Override
+  public PartialListing list(String prefix, final int maxListingCount,
+      final int maxListingDepth, String priorLastKey) throws IOException {
+    return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
+        priorLastKey);
+  }
+
+  @Override
+  public PartialListing listAll(String prefix, final int maxListingCount,
+      final int maxListingDepth, String priorLastKey) throws IOException {
+    return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
+  }
+
+  /**
+   * Searches the given list of {@link FileMetadata} objects for a directory
+   * with the given key.
+   * 
+   * @param list
+   *          The list to search.
+   * @param key
+   *          The key to search for.
+   * @return The wanted directory, or null if not found.
+   */
+  private static FileMetadata getDirectoryInList(
+      final Iterable<FileMetadata> list, String key) {
+    for (FileMetadata current : list) {
+      if (current.isDir() && current.getKey().equals(key)) {
+        return current;
+      }
+    }
+    return null;
+  }
+
+  private PartialListing list(String prefix, String delimiter,
+      final int maxListingCount, final int maxListingDepth, String 
priorLastKey)
+      throws IOException {
+    try {
+      checkContainer(ContainerAccessType.PureRead);
+
+      if (0 < prefix.length() && !prefix.endsWith(PATH_DELIMITER)) {
+        prefix += PATH_DELIMITER;
+      }
+
+      Iterable<ListBlobItem> objects;
+      if (prefix.equals("/")) {
+        objects = listRootBlobs(true);
+      } else {
+        objects = listRootBlobs(prefix, true);
+      }
+
+      ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>();
+      for (ListBlobItem blobItem : objects) {
+        // Check that the maximum listing count is not exhausted.
+        //
+        if (0 < maxListingCount && fileMetadata.size() >= maxListingCount) {
+          break;
+        }
+
+        if (blobItem instanceof CloudBlockBlobWrapper) {
+          String blobKey = null;
+          CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+          BlobProperties properties = blob.getProperties();
+
+          // Determine format of the blob name depending on whether an absolute
+          // path is being used or not.
+          blobKey = normalizeKey(blob);
+
+          FileMetadata metadata;
+          if (retrieveFolderAttribute(blob)) {
+            metadata = new FileMetadata(blobKey, properties.getLastModified()
+                .getTime(), getPermissionStatus(blob),
+                BlobMaterialization.Explicit);
+          } else {
+            metadata = new FileMetadata(blobKey, properties.getLength(),
+                properties.getLastModified().getTime(),
+                getPermissionStatus(blob));
+          }
+
+          // Add the metadata to the list, but remove any existing duplicate
+          // entries first that we may have added by finding nested files.
+          FileMetadata existing = getDirectoryInList(fileMetadata, blobKey);
+          if (existing != null) {
+            fileMetadata.remove(existing);
+          }
+          fileMetadata.add(metadata);
+        } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
+          CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) 
blobItem;
+          // Determine format of directory name depending on whether an 
absolute
+          // path is being used or not.
+          //
+          String dirKey = normalizeKey(directory);
+          // Strip the last /
+          if (dirKey.endsWith(PATH_DELIMITER)) {
+            dirKey = dirKey.substring(0, dirKey.length() - 1);
+          }
+
+          // Reached the targeted listing depth. Return metadata for the
+          // directory using default permissions.
+          //
+          // Note: Something smarter should be done about permissions. Maybe
+          // inherit the permissions of the first non-directory blob.
+          // Also, getting a proper value for last-modified is tricky.
+          FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
+              defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit);
+
+          // Add the directory metadata to the list only if it's not already
+          // there.
+          if (getDirectoryInList(fileMetadata, dirKey) == null) {
+            fileMetadata.add(directoryMetadata);
+          }
+
+          // Currently at a depth of one, decrement the listing depth for
+          // sub-directories.
+          buildUpList(directory, fileMetadata, maxListingCount,
+              maxListingDepth - 1);
+        }
+      }
+      // Note: Original code indicated that this may be a hack.
+      priorLastKey = null;
+      return new PartialListing(priorLastKey,
+          fileMetadata.toArray(new FileMetadata[] {}),
+          0 == fileMetadata.size() ? new String[] {} : new String[] { prefix 
});
+    } catch (Exception e) {
+      // Re-throw as an Azure storage exception.
+      //
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Build up a metadata list of blobs in an Azure blob directory. This method
+   * uses a in-order first traversal of blob directory structures to maintain
+   * the sorted order of the blob names.
+   * 
+   * @param dir
+   *          -- Azure blob directory
+   * 
+   * @param list
+   *          -- a list of file metadata objects for each non-directory blob.
+   * 
+   * @param maxListingLength
+   *          -- maximum length of the built up list.
+   */
+  private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
+      ArrayList<FileMetadata> aFileMetadataList, final int maxListingCount,
+      final int maxListingDepth) throws Exception {
+
+    // Push the blob directory onto the stack.
+    LinkedList<Iterator<ListBlobItem>> dirIteratorStack = new 
LinkedList<Iterator<ListBlobItem>>();
+
+    Iterable<ListBlobItem> blobItems = aCloudBlobDirectory.listBlobs(null,
+        false, EnumSet.of(BlobListingDetails.METADATA), null,
+        getInstrumentedContext());
+    Iterator<ListBlobItem> blobItemIterator = blobItems.iterator();
+
+    if (0 == maxListingDepth || 0 == maxListingCount) {
+      // Recurrence depth and listing count are already exhausted. Return
+      // immediately.
+      return;
+    }
+
+    // The directory listing depth is unbounded if the maximum listing depth
+    // is negative.
+    final boolean isUnboundedDepth = (maxListingDepth < 0);
+
+    // Reset the current directory listing depth.
+    int listingDepth = 1;
+
+    // Loop until all directories have been traversed in-order. Loop only
+    // the following conditions are satisfied:
+    // (1) The stack is not empty, and
+    // (2) maxListingCount > 0 implies that the number of items in the
+    // metadata list is less than the max listing count.
+    while (null != blobItemIterator
+        && (maxListingCount <= 0 || aFileMetadataList.size() < 
maxListingCount)) {
+      while (blobItemIterator.hasNext()) {
+        // Check if the count of items on the list exhausts the maximum
+        // listing count.
+        //
+        if (0 < maxListingCount && aFileMetadataList.size() >= 
maxListingCount) {
+          break;
+        }
+
+        ListBlobItem blobItem = blobItemIterator.next();
+
+        // Add the file metadata to the list if this is not a blob
+        // directory item.
+        if (blobItem instanceof CloudBlockBlobWrapper) {
+          String blobKey = null;
+          CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+          BlobProperties properties = blob.getProperties();
+
+          // Determine format of the blob name depending on whether an absolute
+          // path is being used or not.
+          blobKey = normalizeKey(blob);
+
+          FileMetadata metadata;
+          if (retrieveFolderAttribute(blob)) {
+            metadata = new FileMetadata(blobKey, properties.getLastModified()
+                .getTime(), getPermissionStatus(blob),
+                BlobMaterialization.Explicit);
+          } else {
+            metadata = new FileMetadata(blobKey, properties.getLength(),
+                properties.getLastModified().getTime(),
+                getPermissionStatus(blob));
+          }
+
+          // Add the directory metadata to the list only if it's not already
+          // there.
+          FileMetadata existing = getDirectoryInList(aFileMetadataList, 
blobKey);
+          if (existing != null) {
+            aFileMetadataList.remove(existing);
+          }
+          aFileMetadataList.add(metadata);
+        } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
+          CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) 
blobItem;
+
+          // This is a directory blob, push the current iterator onto
+          // the stack of iterators and start iterating through the current
+          // directory.
+          if (isUnboundedDepth || maxListingDepth > listingDepth) {
+            // Push the current directory on the stack and increment the 
listing
+            // depth.
+            dirIteratorStack.push(blobItemIterator);
+            ++listingDepth;
+
+            // The current blob item represents the new directory. Get
+            // an iterator for this directory and continue by iterating through
+            // this directory.
+            blobItems = directory.listBlobs(null, false,
+                EnumSet.noneOf(BlobListingDetails.class), null,
+                getInstrumentedContext());
+            blobItemIterator = blobItems.iterator();
+          } else {
+            // Determine format of directory name depending on whether an
+            // absolute path is being used or not.
+            String dirKey = normalizeKey(directory);
+
+            if (getDirectoryInList(aFileMetadataList, dirKey) == null) {
+              // Reached the targeted listing depth. Return metadata for the
+              // directory using default permissions.
+              //
+              // Note: Something smarter should be done about permissions. 
Maybe
+              // inherit the permissions of the first non-directory blob.
+              // Also, getting a proper value for last-modified is tricky.
+              FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
+                  defaultPermissionNoBlobMetadata(),
+                  BlobMaterialization.Implicit);
+
+              // Add the directory metadata to the list.
+              aFileMetadataList.add(directoryMetadata);
+            }
+          }
+        }
+      }
+
+      // Traversal of directory tree
+
+      // Check if the iterator stack is empty. If it is set the next blob
+      // iterator to null. This will act as a terminator for the for-loop.
+      // Otherwise pop the next iterator from the stack and continue looping.
+      //
+      if (dirIteratorStack.isEmpty()) {
+        blobItemIterator = null;
+      } else {
+        // Pop the next directory item from the stack and decrement the
+        // depth.
+        blobItemIterator = dirIteratorStack.pop();
+        --listingDepth;
+
+        // Assertion: Listing depth should not be less than zero.
+        if (listingDepth < 0) {
+          throw new AssertionError("Non-negative listing depth expected");
+        }
+      }
+    }
+  }
+
+  /**
+   * Deletes the given blob, taking special care that if we get a 
blob-not-found
+   * exception upon retrying the operation, we just swallow the error since 
what
+   * most probably happened is that the first operation succeeded on the 
server.
+   * 
+   * @param blob
+   *          The blob to delete.
+   * @throws StorageException
+   */
+  private void safeDelete(CloudBlockBlobWrapper blob) throws StorageException {
+    OperationContext operationContext = getInstrumentedContext();
+    try {
+      blob.delete(operationContext);
+    } catch (StorageException e) {
+      // On exception, check that if:
+      // 1. It's a BlobNotFound exception AND
+      // 2. It got there after one-or-more retries THEN
+      // we swallow the exception.
+      if (e.getErrorCode() != null && e.getErrorCode().equals("BlobNotFound")
+          && operationContext.getRequestResults().size() > 1
+          && operationContext.getRequestResults().get(0).getException() != 
null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Swallowing delete exception on retry: " + e.getMessage());
+        }
+        return;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public void delete(String key) throws IOException {
+    try {
+      if (checkContainer(ContainerAccessType.ReadThenWrite) == 
ContainerState.DoesntExist) {
+        // Container doesn't exist, no need to do anything
+        return;
+      }
+
+      // Get the blob reference an delete it.
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      if (blob.exists(getInstrumentedContext())) {
+        safeDelete(blob);
+      }
+    } catch (Exception e) {
+      // Re-throw as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void rename(String srcKey, String dstKey) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Moving " + srcKey + " to " + dstKey);
+    }
+
+    try {
+      // Attempts rename may occur before opening any streams so first,
+      // check if a session exists, if not create a session with the Azure
+      // storage server.
+      if (null == storageInteractionLayer) {
+        final String errMsg = String.format(
+            "Storage session expected for URI '%s' but does not exist.",
+            sessionUri);
+        throw new AssertionError(errMsg);
+      }
+
+      checkContainer(ContainerAccessType.ReadThenWrite);
+      // Get the source blob and assert its existence. If the source key
+      // needs to be normalized then normalize it.
+      CloudBlockBlobWrapper srcBlob = getBlobReference(srcKey);
+
+      if (!srcBlob.exists(getInstrumentedContext())) {
+        throw new AzureException("Source blob " + srcKey + " does not exist.");
+      }
+
+      // Get the destination blob. The destination key always needs to be
+      // normalized.
+      CloudBlockBlobWrapper dstBlob = getBlobReference(dstKey);
+
+      // Rename the source blob to the destination blob by copying it to
+      // the destination blob then deleting it.
+      //
+      dstBlob.startCopyFromBlob(srcBlob, getInstrumentedContext());
+      waitForCopyToComplete(dstBlob, getInstrumentedContext());
+
+      safeDelete(srcBlob);
+    } catch (Exception e) {
+      // Re-throw exception as an Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  private void waitForCopyToComplete(CloudBlockBlobWrapper blob,
+      OperationContext opContext) throws AzureException {
+    boolean copyInProgress = true;
+    int exceptionCount = 0;
+    while (copyInProgress) {
+      try {
+        blob.downloadAttributes(opContext);
+      } catch (StorageException se) {
+        exceptionCount++;
+        if(exceptionCount > 10){
+          throw new AzureException("Too many storage exceptions during 
waitForCopyToComplete", se);
+        }
+      }
+
+      // test for null because mocked filesystem doesn't know about copystates
+      // yet.
+      copyInProgress = (blob.getCopyState() != null && blob.getCopyState()
+          .getStatus() == CopyStatus.PENDING);
+      if (copyInProgress) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  /**
+   * Changes the permission status on the given key.
+   */
+  @Override
+  public void changePermissionStatus(String key, PermissionStatus 
newPermission)
+      throws AzureException {
+    try {
+      checkContainer(ContainerAccessType.ReadThenWrite);
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      blob.downloadAttributes(getInstrumentedContext());
+      storePermissionStatus(blob, newPermission);
+      blob.uploadMetadata(getInstrumentedContext());
+    } catch (Exception e) {
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void purge(String prefix) throws IOException {
+    try {
+
+      // Attempts to purge may occur before opening any streams so first,
+      // check if a session exists, if not create a session with the Azure
+      // storage server.
+      if (null == storageInteractionLayer) {
+        final String errMsg = String.format(
+            "Storage session expected for URI '%s' but does not exist.",
+            sessionUri);
+        throw new AssertionError(errMsg);
+      }
+
+      if (checkContainer(ContainerAccessType.ReadThenWrite) == 
ContainerState.DoesntExist) {
+        // Container doesn't exist, no need to do anything.
+        return;
+      }
+      // Get all blob items with the given prefix from the container and delete
+      // them.
+      Iterable<ListBlobItem> objects = listRootBlobs(prefix, false);
+      for (ListBlobItem blobItem : objects) {
+        ((CloudBlob) blobItem).delete(DeleteSnapshotsOption.NONE, null, null,
+            getInstrumentedContext());
+      }
+    } catch (Exception e) {
+      // Re-throw as an Azure storage exception.
+      //
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void updateFolderLastModifiedTime(String key, Date lastModified)
+      throws AzureException {
+    try {
+      checkContainer(ContainerAccessType.ReadThenWrite);
+      CloudBlockBlobWrapper blob = getBlobReference(key);
+      blob.getProperties().setLastModified(lastModified);
+      blob.uploadProperties(getInstrumentedContext());
+    } catch (Exception e) {
+      // Caught exception while attempting update the properties. Re-throw as 
an
+      // Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void updateFolderLastModifiedTime(String key) throws AzureException {
+    final Calendar lastModifiedCalendar = Calendar
+        .getInstance(Utility.LOCALE_US);
+    lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
+    Date lastModified = lastModifiedCalendar.getTime();
+    updateFolderLastModifiedTime(key, lastModified);
+  }
+
+  @Override
+  public void dump() throws IOException {
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobMaterialization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobMaterialization.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobMaterialization.java
new file mode 100644
index 0000000..a1f8242
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobMaterialization.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Indicates whether there are actual blobs indicating the existence of
+ * directories or whether we're inferring their existence from them having 
files
+ * in there.
+ */
[email protected]
+enum BlobMaterialization {
+  /**
+   * Indicates a directory that isn't backed by an actual blob, but its
+   * existence is implied by the fact that there are files in there. For
+   * example, if the blob /a/b exists then it implies the existence of the /a
+   * directory if there's no /a blob indicating it.
+   */
+  Implicit,
+  /**
+   * Indicates that the directory is backed by an actual blob that has the
+   * isFolder metadata on it.
+   */
+  Explicit,
+}

Reply via email to