HADOOP-10728. Metrics system for Windows Azure Storage Filesystem. Contributed 
by Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, Mike 
Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, Alexander 
Stojanovic, Brian Swan, and Min Wei.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605187 
13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit 0d91576ec31f63402f2db6107a04155368e2632d)

Conflicts:
        hadoop-common-project/hadoop-common/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b928b8c7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b928b8c7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b928b8c7

Branch: refs/heads/branch-2
Commit: b928b8c775ce3cce4d6a818651f935b2f33badaf
Parents: 80d005c
Author: Chris Nauroth <[email protected]>
Authored: Tue Jun 24 20:52:44 2014 +0000
Committer: cnauroth <[email protected]>
Committed: Wed Dec 17 14:57:12 2014 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   5 +
 hadoop-tools/hadoop-azure/.gitignore            |   3 +-
 hadoop-tools/hadoop-azure/README.txt            |  20 +-
 hadoop-tools/hadoop-azure/pom.xml               |  22 +-
 .../fs/azure/AzureNativeFileSystemStore.java    |  49 +-
 .../hadoop/fs/azure/NativeAzureFileSystem.java  |  70 ++-
 .../hadoop/fs/azure/NativeFileSystemStore.java  |   3 +-
 .../metrics/AzureFileSystemInstrumentation.java | 395 ++++++++++++++
 .../metrics/AzureFileSystemMetricsSystem.java   |  64 +++
 .../fs/azure/metrics/BandwidthGaugeUpdater.java | 289 ++++++++++
 .../fs/azure/metrics/ErrorMetricUpdater.java    |  82 +++
 .../metrics/ResponseReceivedMetricUpdater.java  | 147 +++++
 .../fs/azure/metrics/RollingWindowAverage.java  | 103 ++++
 .../apache/hadoop/fs/azure/metrics/package.html |  28 +
 .../fs/azure/AzureBlobStorageTestAccount.java   |  91 +++-
 .../TestAzureFileSystemErrorConditions.java     |   2 -
 .../fs/azure/metrics/AzureMetricsTestUtil.java  |  82 +++
 .../TestAzureFileSystemInstrumentation.java     | 546 +++++++++++++++++++
 .../metrics/TestBandwidthGaugeUpdater.java      | 125 +++++
 .../TestNativeAzureFileSystemMetricsSystem.java |  77 +++
 .../azure/metrics/TestRollingWindowAverage.java |  42 ++
 .../src/test/resources/azure-test.xml           |   1 +
 ...hadoop-metrics2-azure-file-system.properties |  18 +
 23 files changed, 2230 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index ff07174..d1e84cb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -18,6 +18,11 @@ Release 2.7.0 - UNRELEASED
     Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys,
     Alexander Stojanovic, Brian Swan, and Min Wei via cnauroth)
 
+    HADOOP-10728. Metrics system for Windows Azure Storage Filesystem.
+    (Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao,
+    Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys,
+    Alexander Stojanovich, Brian Swan, and Min Wei via cnauroth)
+
   IMPROVEMENTS
 
     HADOOP-11156. DelegateToFileSystem should implement

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/.gitignore
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/.gitignore 
b/hadoop-tools/hadoop-azure/.gitignore
index 09c10b1..837b481 100644
--- a/hadoop-tools/hadoop-azure/.gitignore
+++ b/hadoop-tools/hadoop-azure/.gitignore
@@ -1 +1,2 @@
-.checkstyle
\ No newline at end of file
+.checkstyle
+bin/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/README.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/README.txt 
b/hadoop-tools/hadoop-azure/README.txt
index 4a06747..73306d3 100644
--- a/hadoop-tools/hadoop-azure/README.txt
+++ b/hadoop-tools/hadoop-azure/README.txt
@@ -12,9 +12,13 @@ Unit tests
 =============
 Most of the tests will run without additional configuration.
 For complete testing, configuration in src/test/resources is required:
-  src/test/resources/azure-test.xml
-  src/test/resources/log4j.properties
+  
+  src/test/resources/azure-test.xml -> Defines Azure storage dependencies, 
including account information 
 
+The other files in src/test/resources do not normally need alteration:
+  log4j.properties -> Test logging setup
+  hadoop-metrics2-azure-file-system.properties -> used to wire up 
instrumentation for testing
+  
 From command-line
 ------------------
 Basic execution:
@@ -59,6 +63,12 @@ Enable the Azure emulator tests by setting
   fs.azure.test.emulator -> true 
 in src\test\resources\azure-test.xml
 
+Known issues:
+  Symptom: When running tests for emulator, you see the following failure 
message
+           com.microsoft.windowsazure.storage.StorageException: The value for 
one of the HTTP headers is not in the correct format.
+  Issue:   The emulator can get into a confused state.  
+  Fix:     Restart the Azure Emulator.  Ensure it is v3.2 or later.
+ 
 Running tests against live Azure storage 
 -------------------------------------------------------------------------
 In order to run WASB unit tests against a live Azure Storage account, add 
credentials to 
@@ -101,4 +111,8 @@ Eclipse:
 NOTE:
 - After any change to the checkstyle rules xml, use 
window|preferences|checkstyle|{refresh}|OK
 
- 
\ No newline at end of file
+=============
+Javadoc
+============= 
+Command-line
+> mvn javadoc:javadoc
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/pom.xml 
b/hadoop-tools/hadoop-azure/pom.xml
index 376deb1..97c10fc 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -37,22 +37,6 @@
   </properties>
 
   <build>
-  
-    <testResources>
-      <testResource>
-        <directory>src/test/resources</directory>
-        <includes>
-          <include>log4j.properties</include>
-        </includes>
-      </testResource>
-      <testResource>
-        <directory>src/test/resources</directory>
-        <includes>
-          <include>azure-test.xml</include>
-        </includes>
-      </testResource>
-    </testResources>
-  
     <plugins>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
@@ -198,5 +182,11 @@
       <type>test-jar</type>
     </dependency>
     
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index c5b9afe..5afbbbe 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -17,7 +17,6 @@
  */
 
 package org.apache.hadoop.fs.azure;
-
 import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
 
 import java.io.BufferedInputStream;
@@ -46,6 +45,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
+import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
+import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.mortbay.util.ajax.JSON;
@@ -69,8 +72,15 @@ import 
com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
 import com.microsoft.windowsazure.storage.core.Utility;
 
+
+/**
+ * Core implementation of Windows Azure Filesystem for Hadoop.
+ * Provides the bridging logic between Hadoop's abstract filesystem and Azure 
Storage 
+ *
+ */
 @InterfaceAudience.Private
-class AzureNativeFileSystemStore implements NativeFileSystemStore {
+@VisibleForTesting
+public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   
   /**
    * Configuration knob on whether we do block-level MD5 validation on
@@ -169,6 +179,8 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
   private boolean isAnonymousCredentials = false;
   // Set to true if we are connecting using shared access signatures.
   private boolean connectingUsingSAS = false;
+  private AzureFileSystemInstrumentation instrumentation;
+  private BandwidthGaugeUpdater bandwidthGaugeUpdater;
   private static final JSON PERMISSION_JSON_SERIALIZER = 
createPermissionJsonSerializer();
 
   private boolean suppressRetryPolicy = false;
@@ -301,6 +313,11 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
     this.storageInteractionLayer = storageInteractionLayer;
   }
 
+  @VisibleForTesting
+  public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
+    return bandwidthGaugeUpdater;
+  }
+  
   /**
    * Check if concurrent reads and writes on the same blob are allowed.
    * 
@@ -325,12 +342,18 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
    *           if URI or job object is null, or invalid scheme.
    */
   @Override
-  public void initialize(URI uri, Configuration conf) throws AzureException {
+  public void initialize(URI uri, Configuration conf, 
AzureFileSystemInstrumentation instrumentation) throws AzureException {
 
     if (null == this.storageInteractionLayer) {
       this.storageInteractionLayer = new StorageInterfaceImpl();
     }
 
+    this.instrumentation = instrumentation;
+    this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation);
+    if (null == this.storageInteractionLayer) {
+      this.storageInteractionLayer = new StorageInterfaceImpl();
+    }
+    
     // Check that URI exists.
     //
     if (null == uri) {
@@ -775,8 +798,10 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
         throw new AzureException(errMsg);
       }
 
+      instrumentation.setAccountName(accountName);
       String containerName = getContainerFromAuthority(sessionUri);
-
+      instrumentation.setContainerName(containerName);
+      
       // Check whether this is a storage emulator account.
       if (isStorageEmulatorAccount(accountName)) {
         // It is an emulator account, connect to it with no credentials.
@@ -1522,6 +1547,11 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
           selfThrottlingWriteFactor);
     }
 
+    ResponseReceivedMetricUpdater.hook(
+        operationContext,
+        instrumentation,
+        bandwidthGaugeUpdater);
+    
     // Bind operation context to receive send request callbacks on this
     // operation.
     // If reads concurrent to OOB writes are allowed, the interception will
@@ -1535,6 +1565,8 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       operationContext = testHookOperationContext
           .modifyOperationContext(operationContext);
     }
+    
+    ErrorMetricUpdater.hook(operationContext, instrumentation);
 
     // Return the operation context.
     return operationContext;
@@ -2218,5 +2250,14 @@ class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
 
   @Override
   public void close() {
+    bandwidthGaugeUpdater.close();
+  }
+  
+  // Finalizer to ensure complete shutdown
+  @Override
+  protected void finalize() throws Throwable {
+    LOG.debug("finalize() called");
+    close();
+    super.finalize();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 30e6b30..2b69573 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -31,12 +31,13 @@ import java.util.Date;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,12 +46,14 @@ import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
-
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.windowsazure.storage.core.Utility;
 
@@ -369,8 +372,12 @@ public class NativeAzureFileSystem extends FileSystem {
   private AzureNativeFileSystemStore actualStore;
   private Path workingDir;
   private long blockSize = MAX_AZURE_BLOCK_SIZE;
+  private AzureFileSystemInstrumentation instrumentation;
   private static boolean suppressRetryPolicy = false;
+  // A counter to create unique (within-process) names for my metrics sources.
+  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
 
+  
   public NativeAzureFileSystem() {
     // set store in initialize()
   }
@@ -397,6 +404,20 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   /**
+   * Creates a new metrics source name that's unique within this process.
+   */
+  @VisibleForTesting
+  public static String newMetricsSourceName() {
+    int number = metricsSourceNameCounter.incrementAndGet();
+    final String baseName = "AzureFileSystemMetrics";
+    if (number == 1) { // No need for a suffix for the first one
+      return baseName;
+    } else {
+      return baseName + number;
+    }
+  }
+  
+  /**
    * Checks if the given URI scheme is a scheme that's affiliated with the 
Azure
    * File System.
    * 
@@ -459,7 +480,16 @@ public class NativeAzureFileSystem extends FileSystem {
       store = createDefaultStore(conf);
     }
 
-    store.initialize(uri, conf);
+    // Make sure the metrics system is available before interacting with Azure
+    AzureFileSystemMetricsSystem.fileSystemStarted();
+    String sourceName = newMetricsSourceName(),
+        sourceDesc = "Azure Storage Volume File System metrics";
+    instrumentation = DefaultMetricsSystem.instance().register(sourceName,
+        sourceDesc, new AzureFileSystemInstrumentation(conf));
+    AzureFileSystemMetricsSystem.registerSource(sourceName, sourceDesc,
+        instrumentation);
+
+    store.initialize(uri, conf, instrumentation);
     setConf(conf);
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
@@ -535,9 +565,19 @@ public class NativeAzureFileSystem extends FileSystem {
    * @return The store object.
    */
   @VisibleForTesting
-  AzureNativeFileSystemStore getStore() {
+  public AzureNativeFileSystemStore getStore() {
     return actualStore;
   }
+  
+  /**
+   * Gets the metrics source for this file system.
+   * This is mainly here for unit testing purposes.
+   *
+   * @return the metrics source.
+   */
+  public AzureFileSystemInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
 
   /** This optional operation is not yet supported. */
   @Override
@@ -622,6 +662,10 @@ public class NativeAzureFileSystem extends FileSystem {
     // Construct the data output stream from the buffered output stream.
     FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, 
statistics);
 
+    
+    // Increment the counter
+    instrumentation.fileCreated();
+    
     // Return data output stream to caller.
     return fsOut;
   }
@@ -682,6 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
           store.updateFolderLastModifiedTime(parentKey);
         }
       }
+      instrumentation.fileDeleted();
       store.delete(key);
     } else {
       // The path specifies a folder. Recursively delete all entries under the
@@ -724,6 +769,7 @@ public class NativeAzureFileSystem extends FileSystem {
             p.getKey().lastIndexOf(PATH_DELIMITER));
         if (!p.isDir()) {
           store.delete(key + suffix);
+          instrumentation.fileDeleted();
         } else {
           // Recursively delete contents of the sub-folders. Notice this also
           // deletes the blob for the directory.
@@ -740,6 +786,7 @@ public class NativeAzureFileSystem extends FileSystem {
         String parentKey = pathToKey(parent);
         store.updateFolderLastModifiedTime(parentKey);
       }
+      instrumentation.directoryDeleted();
     }
 
     // File or directory was successfully deleted.
@@ -972,6 +1019,8 @@ public class NativeAzureFileSystem extends FileSystem {
       store.updateFolderLastModifiedTime(key, lastModified);
     }
 
+    instrumentation.directoryCreated();
+    
     // otherwise throws exception
     return true;
   }
@@ -1293,6 +1342,19 @@ public class NativeAzureFileSystem extends FileSystem {
     super.close();
     // Close the store
     store.close();
+    
+    // Notify the metrics system that this file system is closed, which may
+    // trigger one final metrics push to get the accurate final file system
+    // metrics out.
+
+    long startTime = System.currentTimeMillis();
+
+    AzureFileSystemMetricsSystem.fileSystemClosed();
+
+    if (LOG.isDebugEnabled()) {
+        LOG.debug("Submitting metrics when file system closed took "
+                + (System.currentTimeMillis() - startTime) + " ms.");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0fb3c22..4e1d0b6 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -26,6 +26,7 @@ import java.util.Date;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -38,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 interface NativeFileSystemStore {
 
-  void initialize(URI uri, Configuration conf) throws IOException;
+  void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation 
instrumentation) throws IOException;
 
   void storeEmptyFolder(String key, PermissionStatus permissionStatus)
       throws AzureException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
new file mode 100644
index 0000000..e389d7c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * A metrics source for the WASB file system to track all the metrics we care
+ * about for getting a clear picture of the performance/reliability/interaction
+ * of the Hadoop cluster with Azure Storage.
+ */
+@Metrics(about="Metrics for WASB", context="azureFileSystem")
[email protected]
[email protected]
+public final class AzureFileSystemInstrumentation implements MetricsSource {
+  
+  public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId";
+  public static final String METRIC_TAG_ACCOUNT_NAME = "accountName";
+  public static final String METRIC_TAG_CONTAINTER_NAME = "containerName";
+  
+  public static final String WASB_WEB_RESPONSES = "wasb_web_responses";
+  public static final String WASB_BYTES_WRITTEN =
+      "wasb_bytes_written_last_second";
+  public static final String WASB_BYTES_READ =
+      "wasb_bytes_read_last_second";
+  public static final String WASB_RAW_BYTES_UPLOADED =
+      "wasb_raw_bytes_uploaded";
+  public static final String WASB_RAW_BYTES_DOWNLOADED =
+      "wasb_raw_bytes_downloaded";
+  public static final String WASB_FILES_CREATED = "wasb_files_created";
+  public static final String WASB_FILES_DELETED = "wasb_files_deleted";
+  public static final String WASB_DIRECTORIES_CREATED = 
"wasb_directories_created";
+  public static final String WASB_DIRECTORIES_DELETED = 
"wasb_directories_deleted";
+  public static final String WASB_UPLOAD_RATE =
+      "wasb_maximum_upload_bytes_per_second";
+  public static final String WASB_DOWNLOAD_RATE =
+      "wasb_maximum_download_bytes_per_second";
+  public static final String WASB_UPLOAD_LATENCY =
+      "wasb_average_block_upload_latency_ms";
+  public static final String WASB_DOWNLOAD_LATENCY =
+      "wasb_average_block_download_latency_ms";
+  public static final String WASB_CLIENT_ERRORS = "wasb_client_errors";
+  public static final String WASB_SERVER_ERRORS = "wasb_server_errors";
+
+  /**
+   * Config key for how big the rolling window size for latency metrics should
+   * be (in seconds).
+   */
+  private static final String KEY_ROLLING_WINDOW_SIZE = 
"fs.azure.metrics.rolling.window.size";
+
+  private final MetricsRegistry registry =
+      new MetricsRegistry("azureFileSystem")
+      .setContext("azureFileSystem");
+  private final MutableCounterLong numberOfWebResponses =
+      registry.newCounter(
+          WASB_WEB_RESPONSES,
+          "Total number of web responses obtained from Azure Storage",
+          0L);
+  private AtomicLong inMemoryNumberOfWebResponses = new AtomicLong(0);
+  private final MutableCounterLong numberOfFilesCreated =
+      registry.newCounter(
+          WASB_FILES_CREATED,
+          "Total number of files created through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfFilesDeleted =
+      registry.newCounter(
+          WASB_FILES_DELETED,
+          "Total number of files deleted through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfDirectoriesCreated =
+      registry.newCounter(
+          WASB_DIRECTORIES_CREATED,
+          "Total number of directories created through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfDirectoriesDeleted =
+      registry.newCounter(
+          WASB_DIRECTORIES_DELETED,
+          "Total number of directories deleted through the WASB file system.",
+          0L);
+  private final MutableGaugeLong bytesWrittenInLastSecond =
+      registry.newGauge(
+          WASB_BYTES_WRITTEN,
+          "Total number of bytes written to Azure Storage during the last 
second.",
+          0L);
+  private final MutableGaugeLong bytesReadInLastSecond =
+      registry.newGauge(
+          WASB_BYTES_READ,
+          "Total number of bytes read from Azure Storage during the last 
second.",
+          0L);
+  private final MutableGaugeLong maximumUploadBytesPerSecond =
+      registry.newGauge(
+          WASB_UPLOAD_RATE,
+          "The maximum upload rate encountered to Azure Storage in 
bytes/second.",
+          0L);
+  private final MutableGaugeLong maximumDownloadBytesPerSecond =
+      registry.newGauge(
+          WASB_DOWNLOAD_RATE,
+          "The maximum download rate encountered to Azure Storage in 
bytes/second.",
+          0L);
+  private final MutableCounterLong rawBytesUploaded =
+      registry.newCounter(
+          WASB_RAW_BYTES_UPLOADED,
+          "Total number of raw bytes (including overhead) uploaded to Azure" 
+          + " Storage.",
+          0L);
+  private final MutableCounterLong rawBytesDownloaded =
+      registry.newCounter(
+          WASB_RAW_BYTES_DOWNLOADED,
+          "Total number of raw bytes (including overhead) downloaded from 
Azure" 
+          + " Storage.",
+          0L);
+  private final MutableCounterLong clientErrors =
+      registry.newCounter(
+          WASB_CLIENT_ERRORS,
+          "Total number of client-side errors by WASB (excluding 404).",
+          0L);
+  private final MutableCounterLong serverErrors =
+      registry.newCounter(
+          WASB_SERVER_ERRORS,
+          "Total number of server-caused errors by WASB.",
+          0L);
+  private final MutableGaugeLong averageBlockUploadLatencyMs;
+  private final MutableGaugeLong averageBlockDownloadLatencyMs;
+  private long currentMaximumUploadBytesPerSecond;
+  private long currentMaximumDownloadBytesPerSecond;
+  private static final int DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW =
+      5; // seconds
+  private final RollingWindowAverage currentBlockUploadLatency;
+  private final RollingWindowAverage currentBlockDownloadLatency;
+  private UUID fileSystemInstanceId;
+
+  public AzureFileSystemInstrumentation(Configuration conf) {
+    fileSystemInstanceId = UUID.randomUUID();
+    registry.tag("wasbFileSystemId",
+        "A unique identifier for the file ",
+        fileSystemInstanceId.toString());
+    final int rollingWindowSizeInSeconds =
+        conf.getInt(KEY_ROLLING_WINDOW_SIZE,
+            DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW);
+    averageBlockUploadLatencyMs =
+        registry.newGauge(
+            WASB_UPLOAD_LATENCY,
+            String.format("The average latency in milliseconds of uploading a 
single block" 
+            + ". The average latency is calculated over a %d-second rolling" 
+            + " window.", rollingWindowSizeInSeconds),
+            0L);
+    averageBlockDownloadLatencyMs =
+        registry.newGauge(
+            WASB_DOWNLOAD_LATENCY,
+            String.format("The average latency in milliseconds of downloading 
a single block" 
+            + ". The average latency is calculated over a %d-second rolling" 
+            + " window.", rollingWindowSizeInSeconds),
+            0L);
+    currentBlockUploadLatency =
+        new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
+    currentBlockDownloadLatency =
+        new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
+  }
+
+  /**
+   * The unique identifier for this file system in the metrics.
+   */
+  public UUID getFileSystemInstanceId() {
+    return fileSystemInstanceId;
+  }
+  
+  /**
+   * Get the metrics registry information.
+   */
+  public MetricsInfo getMetricsRegistryInfo() {
+    return registry.info();
+  }
+
+  /**
+   * Sets the account name to tag all the metrics with.
+   * @param accountName The account name.
+   */
+  public void setAccountName(String accountName) {
+    registry.tag("accountName",
+        "Name of the Azure Storage account that these metrics are going 
against",
+        accountName);
+  }
+
+  /**
+   * Sets the container name to tag all the metrics with.
+   * @param containerName The container name.
+   */
+  public void setContainerName(String containerName) {
+    registry.tag("containerName",
+        "Name of the Azure Storage container that these metrics are going 
against",
+        containerName);
+  }
+
+  /**
+   * Indicate that we just got a web response from Azure Storage. This should
+   * be called for every web request/response we do (to get accurate metrics
+   * of how we're hitting the storage service).
+   */
+  public void webResponse() {
+    numberOfWebResponses.incr();
+    inMemoryNumberOfWebResponses.incrementAndGet();
+  }
+
+  /**
+   * Gets the current number of web responses obtained from Azure Storage.
+   * @return The number of web responses.
+   */
+  public long getCurrentWebResponses() {
+    return inMemoryNumberOfWebResponses.get();
+  }
+
+  /**
+   * Indicate that we just created a file through WASB.
+   */
+  public void fileCreated() {
+    numberOfFilesCreated.incr();
+  }
+
+  /**
+   * Indicate that we just deleted a file through WASB.
+   */
+  public void fileDeleted() {
+    numberOfFilesDeleted.incr();
+  }
+
+  /**
+   * Indicate that we just created a directory through WASB.
+   */
+  public void directoryCreated() {
+    numberOfDirectoriesCreated.incr();
+  }
+
+  /**
+   * Indicate that we just deleted a directory through WASB.
+   */
+  public void directoryDeleted() {
+    numberOfDirectoriesDeleted.incr();
+  }
+
+  /**
+   * Sets the current gauge value for how many bytes were written in the last
+   *  second.
+   * @param currentBytesWritten The number of bytes.
+   */
+  public void updateBytesWrittenInLastSecond(long currentBytesWritten) {
+    bytesWrittenInLastSecond.set(currentBytesWritten);
+  }
+
+  /**
+   * Sets the current gauge value for how many bytes were read in the last
+   *  second.
+   * @param currentBytesRead The number of bytes.
+   */
+  public void updateBytesReadInLastSecond(long currentBytesRead) {
+    bytesReadInLastSecond.set(currentBytesRead);
+  }
+
+  /**
+   * Record the current bytes-per-second upload rate seen.
+   * @param bytesPerSecond The bytes per second.
+   */
+  public synchronized void currentUploadBytesPerSecond(long bytesPerSecond) {
+    if (bytesPerSecond > currentMaximumUploadBytesPerSecond) {
+      currentMaximumUploadBytesPerSecond = bytesPerSecond;
+      maximumUploadBytesPerSecond.set(bytesPerSecond);
+    }
+  }
+
+  /**
+   * Record the current bytes-per-second download rate seen.
+   * @param bytesPerSecond The bytes per second.
+   */
+  public synchronized void currentDownloadBytesPerSecond(long bytesPerSecond) {
+    if (bytesPerSecond > currentMaximumDownloadBytesPerSecond) {
+      currentMaximumDownloadBytesPerSecond = bytesPerSecond;
+      maximumDownloadBytesPerSecond.set(bytesPerSecond);
+    }
+  }
+
+  /**
+   * Indicate that we just uploaded some data to Azure storage.
+   * @param numberOfBytes The raw number of bytes uploaded (including 
overhead).
+   */
+  public void rawBytesUploaded(long numberOfBytes) {
+    rawBytesUploaded.incr(numberOfBytes);
+  }
+
+  /**
+   * Indicate that we just downloaded some data to Azure storage.
+   * @param numberOfBytes The raw number of bytes downloaded (including 
overhead).
+   */
+  public void rawBytesDownloaded(long numberOfBytes) {
+    rawBytesDownloaded.incr(numberOfBytes);
+  }
+
+  /**
+   * Indicate that we just uploaded a block and record its latency.
+   * @param latency The latency in milliseconds.
+   */
+  public void blockUploaded(long latency) {
+    currentBlockUploadLatency.addPoint(latency);
+  }
+
+  /**
+   * Indicate that we just downloaded a block and record its latency.
+   * @param latency The latency in milliseconds.
+   */
+  public void blockDownloaded(long latency) {
+    currentBlockDownloadLatency.addPoint(latency);
+  }
+
+  /**
+   * Indicate that we just encountered a client-side error.
+   */
+  public void clientErrorEncountered() {
+    clientErrors.incr();
+  }
+
+  /**
+   * Indicate that we just encountered a server-caused error.
+   */
+  public void serverErrorEncountered() {
+    serverErrors.incr();
+  }
+
+  /**
+   * Get the current rolling average of the upload latency.
+   * @return rolling average of upload latency in milliseconds.
+   */
+  public long getBlockUploadLatency() {
+    return currentBlockUploadLatency.getCurrentAverage();
+  }
+
+  /**
+   * Get the current rolling average of the download latency.
+   * @return rolling average of download latency in milliseconds.
+   */
+  public long getBlockDownloadLatency() {
+    return currentBlockDownloadLatency.getCurrentAverage();
+  }
+
+  /**
+   * Get the current maximum upload bandwidth.
+   * @return maximum upload bandwidth in bytes per second.
+   */
+  public long getCurrentMaximumUploadBandwidth() {
+    return currentMaximumUploadBytesPerSecond;
+  }
+
+  /**
+   * Get the current maximum download bandwidth.
+   * @return maximum download bandwidth in bytes per second.
+   */
+  public long getCurrentMaximumDownloadBandwidth() {
+    return currentMaximumDownloadBytesPerSecond;
+
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector builder, boolean all) {
+    averageBlockDownloadLatencyMs.set(
+        currentBlockDownloadLatency.getCurrentAverage());
+    averageBlockUploadLatencyMs.set(
+        currentBlockUploadLatency.getCurrentAverage());
+    registry.snapshot(builder.addRecord(registry.info().name()), true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
new file mode 100644
index 0000000..a5f29c1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+
+/**
+ * AzureFileSystemMetricsSystem
+ */
[email protected]
+public final class AzureFileSystemMetricsSystem {
+  private static MetricsSystemImpl instance;
+  private static int numFileSystems;
+  
+  //private ctor
+  private AzureFileSystemMetricsSystem(){
+  
+  }
+  
+  public static synchronized void fileSystemStarted() {
+    if (numFileSystems == 0) {
+      instance = new MetricsSystemImpl();
+      instance.init("azure-file-system");
+    }
+    numFileSystems++;
+  }
+  
+  public static synchronized void fileSystemClosed() {
+    if (instance != null) {
+      instance.publishMetricsNow();
+    }
+    if (numFileSystems == 1) {
+      instance.stop();
+      instance.shutdown();
+      instance = null;
+    }
+    numFileSystems--;
+  }
+  
+  public static void registerSource(String name, String desc,
+      MetricsSource source) {
+    // Register the source with the name appended with -WasbSystem
+    // so that the name is globally unique.
+    instance.register(name + "-WasbSystem", desc, source);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
new file mode 100644
index 0000000..699fde7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Internal implementation class to help calculate the current bytes
+ * uploaded/downloaded and the maximum bandwidth gauges.
+ */
[email protected]
+public final class BandwidthGaugeUpdater {
+  public static final Log LOG = LogFactory
+      .getLog(BandwidthGaugeUpdater.class);
+  
+  public static final String THREAD_NAME = 
"AzureNativeFilesystemStore-UploadBandwidthUpdater";
+  
+  private static final int DEFAULT_WINDOW_SIZE_MS = 1000;
+  private static final int PROCESS_QUEUE_INITIAL_CAPACITY = 1000;
+  private int windowSizeMs;
+  private ArrayList<BlockTransferWindow> allBlocksWritten =
+      createNewToProcessQueue();
+  private ArrayList<BlockTransferWindow> allBlocksRead =
+      createNewToProcessQueue();
+  private final Object blocksWrittenLock = new Object();
+  private final Object blocksReadLock = new Object();
+  private final AzureFileSystemInstrumentation instrumentation;
+  private Thread uploadBandwidthUpdater;
+  private volatile boolean suppressAutoUpdate = false;
+
+  /**
+   * Create a new updater object with default values.
+   * @param instrumentation The metrics source to update.
+   */
+  public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation) 
{
+    this(instrumentation, DEFAULT_WINDOW_SIZE_MS, false);
+  }
+
+  /**
+   * Create a new updater object with some overrides (used in unit tests).
+   * @param instrumentation The metrics source to update.
+   * @param windowSizeMs The window size to use for calculating bandwidth
+   *                    (in milliseconds).
+   * @param manualUpdateTrigger If true, then this object won't create the
+   *                            auto-update threads, and will wait for manual
+   *                            calls to triggerUpdate to occur.
+   */
+  public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation,
+      int windowSizeMs, boolean manualUpdateTrigger) {
+    this.windowSizeMs = windowSizeMs;
+    this.instrumentation = instrumentation;
+    if (!manualUpdateTrigger) {
+      uploadBandwidthUpdater = new Thread(new UploadBandwidthUpdater(), 
THREAD_NAME);
+      uploadBandwidthUpdater.setDaemon(true);
+      uploadBandwidthUpdater.start();
+    }
+  }
+
+  /**
+   * Indicate that a block has been uploaded.
+   * @param startDate The exact time the upload started.
+   * @param endDate The exact time the upload ended.
+   * @param length The number of bytes uploaded in the block.
+   */
+  public void blockUploaded(Date startDate, Date endDate, long length) {
+    synchronized (blocksWrittenLock) {
+      allBlocksWritten.add(new BlockTransferWindow(startDate, endDate, 
length));
+    }
+  }
+
+  /**
+   * Indicate that a block has been downloaded.
+   * @param startDate The exact time the download started.
+   * @param endDate The exact time the download ended.
+   * @param length The number of bytes downloaded in the block.
+   */
+  public void blockDownloaded(Date startDate, Date endDate, long length) {
+    synchronized (blocksReadLock) {
+      allBlocksRead.add(new BlockTransferWindow(startDate, endDate, length));
+    }
+  }
+
+  /**
+   * Creates a new ArrayList to hold incoming block transfer notifications
+   * before they're processed.
+   * @return The newly created ArrayList.
+   */
+  private static ArrayList<BlockTransferWindow> createNewToProcessQueue() {
+    return new ArrayList<BlockTransferWindow>(PROCESS_QUEUE_INITIAL_CAPACITY);
+  }
+
+  /**
+   * Update the metrics source gauge for how many bytes were transferred
+   * during the last time window.
+   * @param updateWrite If true, update the write (upload) counter.
+   *                    Otherwise update the read (download) counter.
+   * @param bytes The number of bytes transferred.
+   */
+  private void updateBytesTransferred(boolean updateWrite, long bytes) {
+    if (updateWrite) {
+      instrumentation.updateBytesWrittenInLastSecond(bytes);
+    }
+    else {
+      instrumentation.updateBytesReadInLastSecond(bytes);
+    }
+  }
+
+  /**
+   * Update the metrics source gauge for what the current transfer rate
+   * is.
+   * @param updateWrite If true, update the write (upload) counter.
+   *                    Otherwise update the read (download) counter.
+   * @param bytesPerSecond The number of bytes per second we're seeing.
+   */
+  private void updateBytesTransferRate(boolean updateWrite, long 
bytesPerSecond) {
+    if (updateWrite) {
+      instrumentation.currentUploadBytesPerSecond(bytesPerSecond);
+    }
+    else {
+      instrumentation.currentDownloadBytesPerSecond(bytesPerSecond);
+    }
+  }
+
+  /**
+   * For unit test purposes, suppresses auto-update of the metrics
+   * from the dedicated thread.
+   */
+  public void suppressAutoUpdate() {
+    suppressAutoUpdate = true;
+  }
+
+  /**
+   * Resumes auto-update (undo suppressAutoUpdate).
+   */
+  public void resumeAutoUpdate() {
+    suppressAutoUpdate = false;
+  }
+
+  /**
+   * Triggers the update of the metrics gauge based on all the blocks
+   * uploaded/downloaded so far. This is typically done periodically in a
+   * dedicated update thread, but exposing as public for unit test purposes.
+   * 
+   * @param updateWrite If true, we'll update the write (upload) metrics.
+   *                    Otherwise we'll update the read (download) ones.
+   */
+  public void triggerUpdate(boolean updateWrite) {
+    ArrayList<BlockTransferWindow> toProcess = null;
+    synchronized (updateWrite ? blocksWrittenLock : blocksReadLock) {
+      if (updateWrite && !allBlocksWritten.isEmpty()) {
+        toProcess = allBlocksWritten;
+        allBlocksWritten = createNewToProcessQueue();
+      } else if (!updateWrite && !allBlocksRead.isEmpty()) {
+        toProcess = allBlocksRead;
+        allBlocksRead = createNewToProcessQueue();        
+      }
+    }
+
+    // Check to see if we have any blocks to process.
+    if (toProcess == null) {
+      // Nothing to process, set the current bytes and rate to zero.
+      updateBytesTransferred(updateWrite, 0);
+      updateBytesTransferRate(updateWrite, 0);
+      return;
+    }
+
+    // The cut-off time for when we want to calculate rates is one
+    // window size ago from now.
+    long cutoffTime = new Date().getTime() - windowSizeMs;
+
+    // Go through all the blocks we're processing, and calculate the
+    // total number of bytes processed as well as the maximum transfer
+    // rate we experienced for any single block during our time window.
+    long maxSingleBlockTransferRate = 0;
+    long bytesInLastSecond = 0;
+    for (BlockTransferWindow currentWindow : toProcess) {
+      long windowDuration = currentWindow.getEndDate().getTime() 
+          - currentWindow.getStartDate().getTime();
+      if (windowDuration == 0) {
+        // Edge case, assume it took 1 ms but we were too fast
+        windowDuration = 1;
+      }
+      if (currentWindow.getStartDate().getTime() > cutoffTime) {
+        // This block was transferred fully within our time window,
+        // just add its bytes to the total.
+        bytesInLastSecond += currentWindow.bytesTransferred;
+      } else if (currentWindow.getEndDate().getTime() > cutoffTime) {
+        // This block started its transfer before our time window,
+        // interpolate to estimate how many bytes from that block
+        // were actually transferred during our time window.
+        long adjustedBytes = (currentWindow.getBytesTransferred() 
+            * (currentWindow.getEndDate().getTime() - cutoffTime)) 
+            / windowDuration;
+        bytesInLastSecond += adjustedBytes;
+      }
+      // Calculate the transfer rate for this block.
+      long currentBlockTransferRate =
+          (currentWindow.getBytesTransferred() * 1000) / windowDuration;
+      maxSingleBlockTransferRate =
+          Math.max(maxSingleBlockTransferRate, currentBlockTransferRate);
+    }
+    updateBytesTransferred(updateWrite, bytesInLastSecond);
+    // The transfer rate we saw in the last second is a tricky concept to
+    // define: If we saw two blocks, one 2 MB block transferred in 0.2 seconds,
+    // and one 4 MB block transferred in 0.2 seconds, then the maximum rate
+    // is 20 MB/s (the 4 MB block), the average of the two blocks is 15 MB/s,
+    // and the aggregate rate is 6 MB/s (total of 6 MB transferred in one
+    // second). As a first cut, I'm taking the definition to be the maximum
+    // of aggregate or of any single block's rate (so in the example case it's
+    // 6 MB/s).
+    long aggregateTransferRate = bytesInLastSecond;
+    long maxObservedTransferRate =
+        Math.max(aggregateTransferRate, maxSingleBlockTransferRate);
+    updateBytesTransferRate(updateWrite, maxObservedTransferRate);
+  }
+
+  /**
+   * A single block transfer.
+   */
+  private static final class BlockTransferWindow {
+    private final Date startDate;
+    private final Date endDate;
+    private final long bytesTransferred;
+
+    public BlockTransferWindow(Date startDate, Date endDate,
+        long bytesTransferred) {
+      this.startDate = startDate;
+      this.endDate = endDate;
+      this.bytesTransferred = bytesTransferred;
+    }
+
+    public Date getStartDate() { return startDate; }
+    public Date getEndDate() { return endDate; }
+    public long getBytesTransferred() { return bytesTransferred; }
+  }
+
+  /**
+   * The auto-update thread.
+   */
+  private final class UploadBandwidthUpdater implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          Thread.sleep(windowSizeMs);
+          if (!suppressAutoUpdate) {
+            triggerUpdate(true);
+            triggerUpdate(false);
+          }
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  public void close() {
+    if (uploadBandwidthUpdater != null) {
+      // Interrupt and join the updater thread in death.
+      uploadBandwidthUpdater.interrupt();
+      try {
+        uploadBandwidthUpdater.join();
+      } catch (InterruptedException e) {
+      }
+      uploadBandwidthUpdater = null;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
new file mode 100644
index 0000000..d33e8c4
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND; //404
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;  //400
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; //500
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RequestResult;
+import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
+import com.microsoft.windowsazure.storage.StorageEvent;
+
+
+/**
+ * An event listener to the ResponseReceived event from Azure Storage that will
+ * update error metrics appropriately when it gets that event.
+ */
[email protected]
+public final class ErrorMetricUpdater extends 
StorageEvent<ResponseReceivedEvent> {
+  private final AzureFileSystemInstrumentation instrumentation;
+  private final OperationContext operationContext;
+
+  private ErrorMetricUpdater(OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation) {
+    this.instrumentation = instrumentation;
+    this.operationContext = operationContext;
+  }
+
+  /**
+   * Hooks a new listener to the given operationContext that will update the
+   * error metrics for the WASB file system appropriately in response to
+   * ResponseReceived events.
+   *
+   * @param operationContext The operationContext to hook.
+   * @param instrumentation The metrics source to update.
+   */
+  public static void hook(
+      OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation) {
+    ErrorMetricUpdater listener =
+        new ErrorMetricUpdater(operationContext,
+            instrumentation);
+    operationContext.getResponseReceivedEventHandler().addListener(listener);
+  }
+
+  @Override
+  public void eventOccurred(ResponseReceivedEvent eventArg) {
+    RequestResult currentResult = operationContext.getLastResult();
+    int statusCode = currentResult.getStatusCode();
+    // Check if it's a client-side error: a 4xx status
+    // We exclude 404 because it happens frequently during the normal
+    // course of operation (each call to exists() would generate that
+    // if it's not found).
+    if (statusCode >= HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR 
+        && statusCode != HTTP_NOT_FOUND) {
+      instrumentation.clientErrorEncountered();
+    } else if (statusCode >= HTTP_INTERNAL_ERROR) {
+      // It's a server error: a 5xx status. Could be an Azure Storage
+      // bug or (more likely) throttling.
+      instrumentation.serverErrorEncountered();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
new file mode 100644
index 0000000..e3f5d44
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.net.HttpURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.Constants.HeaderConstants;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RequestResult;
+import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
+import com.microsoft.windowsazure.storage.StorageEvent;
+
+
+/**
+ * An event listener to the ResponseReceived event from Azure Storage that will
+ * update metrics appropriately.
+ *
+ */
[email protected]
+public final class ResponseReceivedMetricUpdater extends 
StorageEvent<ResponseReceivedEvent> {
+
+  public static final Log LOG = 
LogFactory.getLog(ResponseReceivedMetricUpdater.class);
+
+  private final AzureFileSystemInstrumentation instrumentation;
+  private final BandwidthGaugeUpdater blockUploadGaugeUpdater;
+  
+  private ResponseReceivedMetricUpdater(OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation,
+      BandwidthGaugeUpdater blockUploadGaugeUpdater) {
+    this.instrumentation = instrumentation;
+    this.blockUploadGaugeUpdater = blockUploadGaugeUpdater;
+  }
+
+  /**
+   * Hooks a new listener to the given operationContext that will update the
+   * metrics for the WASB file system appropriately in response to
+   * ResponseReceived events.
+   *
+   * @param operationContext The operationContext to hook.
+   * @param instrumentation The metrics source to update.
+   * @param blockUploadGaugeUpdater The blockUploadGaugeUpdater to use.
+   */
+  public static void hook(
+      OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation,
+      BandwidthGaugeUpdater blockUploadGaugeUpdater) {
+    ResponseReceivedMetricUpdater listener =
+        new ResponseReceivedMetricUpdater(operationContext,
+            instrumentation, blockUploadGaugeUpdater);
+    operationContext.getResponseReceivedEventHandler().addListener(listener);
+  }
+
+  /**
+   * Get the content length of the request in the given HTTP connection.
+   * @param connection The connection.
+   * @return The content length, or zero if not found.
+   */
+  private long getRequestContentLength(HttpURLConnection connection) {
+    String lengthString = connection.getRequestProperty(
+        HeaderConstants.CONTENT_LENGTH);
+    if (lengthString != null){
+      return Long.parseLong(lengthString);
+    }
+    else{
+      return 0;
+    }
+  }
+
+  /**
+   * Gets the content length of the response in the given HTTP connection.
+   * @param connection The connection.
+   * @return The content length.
+   */
+  private long getResponseContentLength(HttpURLConnection connection) {
+    return connection.getContentLength();
+  }
+
+  /**
+   * Handle the response-received event from Azure SDK.
+   */
+  @Override
+  public void eventOccurred(ResponseReceivedEvent eventArg) {
+    instrumentation.webResponse();
+    if (!(eventArg.getConnectionObject() instanceof HttpURLConnection)) {
+      // Typically this shouldn't happen, but just let it pass
+      return;
+    }
+    HttpURLConnection connection =
+        (HttpURLConnection) eventArg.getConnectionObject();
+    RequestResult currentResult = eventArg.getRequestResult();
+    if (currentResult == null) {
+      // Again, typically shouldn't happen, but let it pass
+      return;
+    }
+
+    long requestLatency = currentResult.getStopDate().getTime() 
+        - currentResult.getStartDate().getTime();
+
+    if (currentResult.getStatusCode() == HttpURLConnection.HTTP_CREATED 
+        && connection.getRequestMethod().equalsIgnoreCase("PUT")) {
+      // If it's a PUT with an HTTP_CREATED status then it's a successful
+      // block upload.
+      long length = getRequestContentLength(connection);
+      if (length > 0) {
+        blockUploadGaugeUpdater.blockUploaded(
+            currentResult.getStartDate(),
+            currentResult.getStopDate(),
+            length);
+        instrumentation.rawBytesUploaded(length);
+        instrumentation.blockUploaded(requestLatency);
+      }
+    } else if (currentResult.getStatusCode() == HttpURLConnection.HTTP_PARTIAL 
+        && connection.getRequestMethod().equalsIgnoreCase("GET")) {
+      // If it's a GET with an HTTP_PARTIAL status then it's a successful
+      // block download.
+      long length = getResponseContentLength(connection);
+      if (length > 0) {
+        blockUploadGaugeUpdater.blockDownloaded(
+            currentResult.getStartDate(),
+            currentResult.getStopDate(),
+            length);
+        instrumentation.rawBytesDownloaded(length);
+        instrumentation.blockDownloaded(requestLatency);
+      }
+    } 
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
new file mode 100644
index 0000000..184907a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.ArrayDeque;
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Helper class to calculate rolling-window averages.
+ * Used to calculate rolling-window metrics in AzureNativeFileSystem.
+ */
[email protected]
+final class RollingWindowAverage {
+  private final ArrayDeque<DataPoint> currentPoints =
+      new ArrayDeque<DataPoint>();
+  private final long windowSizeMs;
+
+  /**
+   * Create a new rolling-window average for the given window size.
+   * @param windowSizeMs The size of the window in milliseconds.
+   */
+  public RollingWindowAverage(long windowSizeMs) {
+    this.windowSizeMs = windowSizeMs;
+  }
+
+  /**
+   * Add a new data point that just happened.
+   * @param value The value of the data point.
+   */
+  public synchronized void addPoint(long value) {
+    currentPoints.offer(new DataPoint(new Date(), value));
+    cleanupOldPoints();
+  }
+
+  /**
+   * Get the current average.
+   * @return The current average.
+   */
+  public synchronized long getCurrentAverage() {
+    cleanupOldPoints();
+    if (currentPoints.isEmpty()) {
+      return 0;
+    }
+    long sum = 0;
+    for (DataPoint current : currentPoints) {
+      sum += current.getValue();
+    }
+    return sum / currentPoints.size();
+  }
+
+  /**
+   * Clean up points that don't count any more (are before our
+   * rolling window) from our current queue of points.
+   */
+  private void cleanupOldPoints() {
+    Date cutoffTime = new Date(new Date().getTime() - windowSizeMs);
+    while (!currentPoints.isEmpty() 
+        && currentPoints.peekFirst().getEventTime().before(cutoffTime)) {
+      currentPoints.removeFirst();
+    }
+  }
+
+  /**
+   * A single data point.
+   */
+  private static class DataPoint {
+    private final Date eventTime;
+    private final long value;
+
+    public DataPoint(Date eventTime, long value) {
+      this.eventTime = eventTime;
+      this.value = value;
+    }
+
+    public Date getEventTime() {
+      return eventTime;
+    }
+
+    public long getValue() {
+      return value;
+    }
+
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
new file mode 100644
index 0000000..5e8d6a8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
@@ -0,0 +1,28 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>
+Infrastructure for a Metrics2 source that provides information on Windows 
+Azure Filesystem for Hadoop instances. 
+</p>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
index 8133954..02738e7 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
@@ -27,9 +27,18 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.GregorianCalendar;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 
 import com.microsoft.windowsazure.storage.AccessCondition;
 import com.microsoft.windowsazure.storage.CloudStorageAccount;
@@ -76,7 +85,10 @@ public final class AzureBlobStorageTestAccount {
   private NativeAzureFileSystem fs;
   private AzureNativeFileSystemStore storage;
   private MockStorageInterface mockStorage;
-
+  private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
+      new ConcurrentLinkedQueue<MetricsRecord>();
+  
+  
   private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
       CloudStorageAccount account, CloudBlobContainer container) {
     this.account = account;
@@ -124,6 +136,10 @@ public final class AzureBlobStorageTestAccount {
     this.fs = fs;
     this.mockStorage = mockStorage;
   }
+  
+  private static void addRecord(MetricsRecord record) {
+    allMetrics.add(record);
+  }
 
   public static String getMockContainerUri() {
     return String.format("http://%s/%s";,
@@ -141,6 +157,47 @@ public final class AzureBlobStorageTestAccount {
     // Remove the first SEPARATOR
     return toMockUri(path.toUri().getRawPath().substring(1)); 
   }
+  
+  public Number getLatestMetricValue(String metricName, Number defaultValue)
+      throws IndexOutOfBoundsException{
+    boolean found = false;
+    Number ret = null;
+    for (MetricsRecord currentRecord : allMetrics) {
+      // First check if this record is coming for my file system.
+      if (wasGeneratedByMe(currentRecord)) {
+        for (AbstractMetric currentMetric : currentRecord.metrics()) {
+          if (currentMetric.name().equalsIgnoreCase(metricName)) {
+            found = true;
+            ret = currentMetric.value();
+            break;
+          }
+        }
+      }
+    }
+    if (!found) {
+      if (defaultValue != null) {
+        return defaultValue;
+      }
+      throw new IndexOutOfBoundsException(metricName);
+    }
+    return ret;
+  }
+
+  /**
+   * Checks if the given record was generated by my WASB file system instance.
+   * @param currentRecord The metrics record to check.
+   * @return
+   */
+  private boolean wasGeneratedByMe(MetricsRecord currentRecord) {
+    String myFsId = 
fs.getInstrumentation().getFileSystemInstanceId().toString();
+    for (MetricsTag currentTag : currentRecord.tags()) {
+      if (currentTag.name().equalsIgnoreCase("wasbFileSystemId")) {
+        return currentTag.value().equals(myFsId);
+      }
+    }
+    return false;
+  }
+
 
   /**
    * Gets the blob reference to the given blob key.
@@ -236,7 +293,6 @@ public final class AzureBlobStorageTestAccount {
 
   public static AzureBlobStorageTestAccount createOutOfBandStore(
       int uploadBlockSize, int downloadBlockSize) throws Exception {
-
     CloudBlobContainer container = null;
     Configuration conf = createTestConfiguration();
     CloudStorageAccount account = createTestAccount(conf);
@@ -262,11 +318,25 @@ public final class AzureBlobStorageTestAccount {
     // Set account URI and initialize Azure file system.
     URI accountUri = createAccountUri(accountName, containerName);
 
+    // Set up instrumentation.
+    //
+    AzureFileSystemMetricsSystem.fileSystemStarted();
+    String sourceName = NativeAzureFileSystem.newMetricsSourceName();
+    String sourceDesc = "Azure Storage Volume File System metrics";
+
+    AzureFileSystemInstrumentation instrumentation =
+        DefaultMetricsSystem.instance().register(sourceName,
+                sourceDesc, new AzureFileSystemInstrumentation(conf));
+
+    AzureFileSystemMetricsSystem.registerSource(
+        sourceName, sourceDesc, instrumentation);
+    
+    
     // Create a new AzureNativeFileSystemStore object.
     AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore();
 
     // Initialize the store with the throttling feedback interfaces.
-    testStorage.initialize(accountUri, conf);
+    testStorage.initialize(accountUri, conf, instrumentation);
 
     // Create test account initializing the appropriate member variables.
     AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
@@ -722,5 +792,20 @@ public final class AzureBlobStorageTestAccount {
   public MockStorageInterface getMockStorage() {
     return mockStorage;
   }
+  
+  public static class StandardCollector implements MetricsSink {
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      addRecord(record);
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
index 88d976c..6e89822 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
@@ -33,11 +33,9 @@ import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.junit.Test;
 
 import com.microsoft.windowsazure.storage.OperationContext;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b928b8c7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
new file mode 100644
index 0000000..1269417
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import static 
org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_READ;
+import static 
org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_WRITTEN;
+import static 
org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_DOWNLOADED;
+import static 
org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_UPLOADED;
+import static 
org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+public final class AzureMetricsTestUtil {
+  public static long getLongGaugeValue(AzureFileSystemInstrumentation 
instrumentation,
+      String gaugeName) {
+         return getLongGauge(gaugeName, getMetrics(instrumentation));
+  }
+  
+  /**
+   * Gets the current value of the given counter.
+   */
+  public static long getLongCounterValue(AzureFileSystemInstrumentation 
instrumentation,
+      String counterName) {
+    return getLongCounter(counterName, getMetrics(instrumentation));
+  }
+
+
+  /**
+   * Gets the current value of the wasb_bytes_written_last_second counter.
+   */
+  public static long getCurrentBytesWritten(AzureFileSystemInstrumentation 
instrumentation) {
+    return getLongGaugeValue(instrumentation, WASB_BYTES_WRITTEN);
+  }
+
+  /**
+   * Gets the current value of the wasb_bytes_read_last_second counter.
+   */
+  public static long getCurrentBytesRead(AzureFileSystemInstrumentation 
instrumentation) {
+    return getLongGaugeValue(instrumentation, WASB_BYTES_READ);
+  }
+
+  /**
+   * Gets the current value of the wasb_raw_bytes_uploaded counter.
+   */
+  public static long getCurrentTotalBytesWritten(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounterValue(instrumentation, WASB_RAW_BYTES_UPLOADED);
+  }
+
+  /**
+   * Gets the current value of the wasb_raw_bytes_downloaded counter.
+   */
+  public static long getCurrentTotalBytesRead(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounterValue(instrumentation, WASB_RAW_BYTES_DOWNLOADED);
+  }
+
+  /**
+   * Gets the current value of the asv_web_responses counter.
+   */
+  public static long getCurrentWebResponses(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounter(WASB_WEB_RESPONSES, getMetrics(instrumentation));
+  }
+}

Reply via email to