This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b60d0c1f44 [HDFS-16971] Add read metrics for remote reads in 
FileSystem Statistics #5534 (#5536)
2b60d0c1f44 is described below

commit 2b60d0c1f440e61b57085abd2d72a30db7c013cf
Author: Melissa You <31492618+melissa...@users.noreply.github.com>
AuthorDate: Thu Apr 13 09:07:42 2023 -0700

    [HDFS-16971] Add read metrics for remote reads in FileSystem Statistics 
#5534 (#5536)
---
 .../main/java/org/apache/hadoop/fs/FileSystem.java | 34 ++++++++++++++++++++++
 .../hadoop/fs/FileSystemStorageStatistics.java     |  5 +++-
 .../hadoop/fs/TestFileSystemStorageStatistics.java |  6 +++-
 .../java/org/apache/hadoop/hdfs/DFSClient.java     | 10 +++++--
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  9 ++++--
 .../apache/hadoop/hdfs/DFSStripedInputStream.java  |  6 ++--
 .../java/org/apache/hadoop/hdfs/StripeReader.java  |  5 +++-
 7 files changed, 64 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 763af197a1f..5d8f0e575f2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -3942,6 +3942,7 @@ public abstract class FileSystem extends Configured
       private volatile long bytesReadDistanceOfThreeOrFour;
       private volatile long bytesReadDistanceOfFiveOrLarger;
       private volatile long bytesReadErasureCoded;
+      private volatile long remoteReadTimeMS;
 
       /**
        * Add another StatisticsData object to this one.
@@ -3959,6 +3960,7 @@ public abstract class FileSystem extends Configured
         this.bytesReadDistanceOfFiveOrLarger +=
             other.bytesReadDistanceOfFiveOrLarger;
         this.bytesReadErasureCoded += other.bytesReadErasureCoded;
+        this.remoteReadTimeMS += other.remoteReadTimeMS;
       }
 
       /**
@@ -3977,6 +3979,7 @@ public abstract class FileSystem extends Configured
         this.bytesReadDistanceOfFiveOrLarger =
             -this.bytesReadDistanceOfFiveOrLarger;
         this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
+        this.remoteReadTimeMS = -this.remoteReadTimeMS;
       }
 
       @Override
@@ -4025,6 +4028,10 @@ public abstract class FileSystem extends Configured
       public long getBytesReadErasureCoded() {
         return bytesReadErasureCoded;
       }
+
+      public long getRemoteReadTimeMS() {
+        return remoteReadTimeMS;
+      }
     }
 
     private interface StatisticsAggregator<T> {
@@ -4252,6 +4259,14 @@ public abstract class FileSystem extends Configured
       }
     }
 
+    /**
+     * Increment the time taken to read bytes from remote in the statistics.
+     * @param durationMS time taken in ms to read bytes from remote
+     */
+    public void increaseRemoteReadTime(final long durationMS) {
+      getThreadStatistics().remoteReadTimeMS += durationMS;
+    }
+
     /**
      * Apply the given aggregator to all StatisticsData objects associated with
      * this Statistics object.
@@ -4399,6 +4414,25 @@ public abstract class FileSystem extends Configured
       return bytesRead;
     }
 
+    /**
+     * Get total time taken in ms for bytes read from remote.
+     * @return time taken in ms for remote bytes read.
+     */
+    public long getRemoteReadTime() {
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long remoteReadTimeMS = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          remoteReadTimeMS += data.remoteReadTimeMS;
+        }
+
+        public Long aggregate() {
+          return remoteReadTimeMS;
+        }
+      });
+    }
+
     /**
      * Get all statistics data.
      * MR or other frameworks can use the method to get all statistics at once.
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
index 62806d61b54..9e62e63775a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
@@ -47,7 +47,8 @@ public class FileSystemStorageStatistics extends 
StorageStatistics {
       "bytesReadDistanceOfOneOrTwo",
       "bytesReadDistanceOfThreeOrFour",
       "bytesReadDistanceOfFiveOrLarger",
-      "bytesReadErasureCoded"
+      "bytesReadErasureCoded",
+      "remoteReadTimeMS"
   };
 
   private static class LongStatisticIterator
@@ -107,6 +108,8 @@ public class FileSystemStorageStatistics extends 
StorageStatistics {
       return data.getBytesReadDistanceOfFiveOrLarger();
     case "bytesReadErasureCoded":
       return data.getBytesReadErasureCoded();
+    case "remoteReadTimeMS":
+      return data.getRemoteReadTimeMS();
     default:
       return null;
     }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java
index 2b4e686e592..e99f0f2348b 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java
@@ -52,7 +52,8 @@ public class TestFileSystemStorageStatistics {
       "bytesReadDistanceOfOneOrTwo",
       "bytesReadDistanceOfThreeOrFour",
       "bytesReadDistanceOfFiveOrLarger",
-      "bytesReadErasureCoded"
+      "bytesReadErasureCoded",
+      "remoteReadTimeMS"
   };
 
   private FileSystem.Statistics statistics =
@@ -74,6 +75,7 @@ public class TestFileSystemStorageStatistics {
     statistics.incrementBytesReadByDistance(1, RandomUtils.nextInt(0, 100));
     statistics.incrementBytesReadByDistance(3, RandomUtils.nextInt(0, 100));
     statistics.incrementBytesReadErasureCoded(RandomUtils.nextInt(0, 100));
+    statistics.increaseRemoteReadTime(RandomUtils.nextInt(0, 100));
   }
 
   @Test
@@ -128,6 +130,8 @@ public class TestFileSystemStorageStatistics {
       return statistics.getBytesReadByDistance(5);
     case "bytesReadErasureCoded":
       return statistics.getBytesReadErasureCoded();
+    case "remoteReadTimeMS":
+      return statistics.getRemoteReadTime();
     default:
       return 0;
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index acfca6799f4..8faeebe8e85 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -3090,10 +3090,14 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     }
   }
 
-  void updateFileSystemReadStats(int distance, int nRead) {
+  void updateFileSystemReadStats(int distance, int readBytes, long readTimeMS) 
{
     if (stats != null) {
-      stats.incrementBytesRead(nRead);
-      stats.incrementBytesReadByDistance(distance, nRead);
+      stats.incrementBytesRead(readBytes);
+      stats.incrementBytesReadByDistance(distance, readBytes);
+      if (distance > 0) {
+        //remote read
+        stats.increaseRemoteReadTime(readTimeMS);
+      }
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index a8d80016072..b5be33206e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -851,8 +851,9 @@ public class DFSInputStream extends FSInputStream
                   locatedBlocks.getFileLength() - pos);
             }
           }
+          long beginReadMS = Time.monotonicNow();
           int result = readBuffer(strategy, realLen, corruptedBlocks);
-
+          long readTimeMS = Time.monotonicNow() - beginReadMS;
           if (result >= 0) {
             pos += result;
           } else {
@@ -861,7 +862,7 @@ public class DFSInputStream extends FSInputStream
           }
           updateReadStatistics(readStatistics, result, blockReader);
           dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-              result);
+              result, readTimeMS);
           if (readStatistics.getBlockType() == BlockType.STRIPED) {
             dfsClient.updateFileSystemECReadStats(result);
           }
@@ -1184,6 +1185,7 @@ public class DFSInputStream extends FSInputStream
         ByteBuffer tmp = buf.duplicate();
         tmp.limit(tmp.position() + len);
         tmp = tmp.slice();
+        long beginReadMS = Time.monotonicNow();
         int nread = 0;
         int ret;
         while (true) {
@@ -1193,11 +1195,12 @@ public class DFSInputStream extends FSInputStream
           }
           nread += ret;
         }
+        long readTimeMS = Time.monotonicNow() - beginReadMS;
         buf.position(buf.position() + nread);
 
         IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
         dfsClient.updateFileSystemReadStats(
-            reader.getNetworkDistance(), nread);
+            reader.getNetworkDistance(), nread, readTimeMS);
         if (readStatistics.getBlockType() == BlockType.STRIPED) {
           dfsClient.updateFileSystemECReadStats(nread);
         }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 5ae51709593..6c1bafbef9d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -331,15 +331,17 @@ public class DFSStripedInputStream extends DFSInputStream 
{
    * its ThreadLocal.
    *
    * @param stats striped read stats
+   * @param readTimeMS read time metrics in ms
+   *
    */
-  void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
+  void updateReadStats(final StripedBlockUtil.BlockReadStats stats, long 
readTimeMS) {
     if (stats == null) {
       return;
     }
     updateReadStatistics(readStatistics, stats.getBytesRead(),
         stats.isShortCircuit(), stats.getNetworkDistance());
     dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
-        stats.getBytesRead());
+        stats.getBytesRead(), readTimeMS);
     assert readStatistics.getBlockType() == BlockType.STRIPED;
     dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index 3fc87c7952a..f2d6732a459 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -351,9 +351,12 @@ abstract class StripeReader {
     // first read failure
     while (!futures.isEmpty()) {
       try {
+        long beginReadMS = Time.monotonicNow();
         StripingChunkReadResult r = StripedBlockUtil
             .getNextCompletedStripedRead(service, futures, 0);
-        dfsStripedInputStream.updateReadStats(r.getReadStats());
+        long readTimeMS = Time.monotonicNow() - beginReadMS;
+
+        dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS);
         DFSClient.LOG.debug("Read task returned: {}, for stripe {}",
             r, alignedStripe);
         StripingChunk returnedChunk = alignedStripe.chunks[r.index];


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to