This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 93c4704b3324c505dfc9fb2cb4c63b85babaf9fc
Author: Mukund Thakur <mtha...@cloudera.com>
AuthorDate: Fri Aug 12 01:42:00 2022 +0530

    HADOOP-18392. Propagate vectored s3a input stream stats to file system 
stats. (#4704)
    
    part of HADOOP-18103.
    
    Contributed By: Mukund Thakur
---
 .../contract/AbstractContractVectoredReadTest.java |  23 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  17 ++
 .../contract/s3a/ITestS3AContractVectoredRead.java | 266 +++++++++++----------
 3 files changed, 172 insertions(+), 134 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 379b992fba1..c76f1839b77 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -24,11 +24,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.IntFunction;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -43,13 +42,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
-import org.apache.hadoop.test.LambdaTestUtils;
 
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 
 @RunWith(Parameterized.class)
 public abstract class AbstractContractVectoredReadTest extends 
AbstractFSContractTestBase {
@@ -281,16 +281,11 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
       in.readVectored(fileRanges, allocate);
       for (FileRange res : fileRanges) {
         CompletableFuture<ByteBuffer> data = res.getData();
-        try {
-          ByteBuffer buffer = data.get();
-          // Shouldn't reach here.
-          Assert.fail("EOFException must be thrown while reading EOF");
-        } catch (ExecutionException ex) {
-          // ignore as expected.
-        } catch (Exception ex) {
-          LOG.error("Exception while running vectored read ", ex);
-          Assert.fail("Exception while running vectored read " + ex);
-        }
+        interceptFuture(EOFException.class,
+                "",
+                ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+                TimeUnit.SECONDS,
+                data);
       }
     }
   }
@@ -410,7 +405,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
             fs.openFile(path(VECTORED_READ_FILE_NAME))
                     .build();
     try (FSDataInputStream in = builder.get()) {
-      LambdaTestUtils.intercept(clazz,
+      intercept(clazz,
           () -> in.readVectored(fileRanges, allocate));
     }
   }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index dfe9fdf2d8d..2dc88eeb85e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -308,6 +308,23 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_OPERATIONS,
       "Count of read() operations in an input stream",
       TYPE_COUNTER),
+  STREAM_READ_VECTORED_OPERATIONS(
+          StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+          "Count of readVectored() operations in an input stream.",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
+          StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+          "Count of bytes discarded during readVectored() operation." +
+                  " in an input stream",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_INCOMING_RANGES(
+          StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+          "Count of incoming file ranges during readVectored() operation.",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_COMBINED_RANGES(
+          StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+          "Count of combined file ranges during readVectored() operation.",
+          TYPE_COUNTER),
   STREAM_READ_REMOTE_STREAM_ABORTED(
       StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
       "Duration of aborting a remote stream during stream IO",
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 84a90ba441a..4c357e288c8 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -176,146 +176,172 @@ public class ITestS3AContractVectoredRead extends 
AbstractContractVectoredReadTe
    * */
   @Test
   public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
-    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
-    List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
-    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
-
-    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(fileRanges, getAllocate());
-      validateVectoredReadResult(fileRanges, DATASET);
-      returnBuffersToPoolPostRead(fileRanges, getPool());
-
-      // audit the io statistics for this stream
-      IOStatistics st = in.getIOStatistics();
-      LOG.info("IOStats after readVectored operation {}", 
ioStatisticsToPrettyString(st));
-
-      // the vectored io operation must be tracked
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
-              1);
-
-      // the vectored io operation is being called with 5 input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
-              5);
-
-      // 5 input ranges got combined in 3 as some of them are close.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
-              3);
-
-      // number of bytes discarded will be based on the above input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              5944);
-
-      verifyStatisticCounterValue(st,
-              StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
-              3);
-
-      // read bytes should match the sum of requested length for each input 
ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_BYTES,
-              1424);
-
-    }
 
-    CompletableFuture<FSDataInputStream> builder1 =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
+    try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+      List<FileRange> fileRanges = new ArrayList<>();
+      fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+      fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+      FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+      CompletableFuture<FSDataInputStream> builder =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
+      try (FSDataInputStream in = builder.get()) {
+        in.readVectored(fileRanges, getAllocate());
+        validateVectoredReadResult(fileRanges, DATASET);
+        returnBuffersToPoolPostRead(fileRanges, getPool());
+
+        // audit the io statistics for this stream
+        IOStatistics st = in.getIOStatistics();
+        LOG.info("IOStats after readVectored operation {}", 
ioStatisticsToPrettyString(st));
+
+        // the vectored io operation must be tracked
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                1);
+
+        // the vectored io operation is being called with 5 input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+                5);
+
+        // 5 input ranges got combined in 3 as some of them are close.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+                3);
+
+        // number of bytes discarded will be based on the above input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                5944);
+
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                3);
+
+        // read bytes should match the sum of requested length for each input 
ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                1424);
 
-    try (FSDataInputStream in = builder1.get()) {
-      for (FileRange range : fileRanges) {
-        byte[] temp = new byte[range.getLength()];
-        in.readFully((int) range.getOffset(), temp, 0, range.getLength());
       }
 
-      // audit the statistics for this stream
-      IOStatistics st = in.getIOStatistics();
-      LOG.info("IOStats after read fully operation {}", 
ioStatisticsToPrettyString(st));
-
-      verifyStatisticCounterValue(st,
+      CompletableFuture<FSDataInputStream> builder1 =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
+
+      try (FSDataInputStream in = builder1.get()) {
+        for (FileRange range : fileRanges) {
+          byte[] temp = new byte[range.getLength()];
+          in.readFully((int) range.getOffset(), temp, 0, range.getLength());
+        }
+
+        // audit the statistics for this stream
+        IOStatistics st = in.getIOStatistics();
+        LOG.info("IOStats after read fully operation {}", 
ioStatisticsToPrettyString(st));
+
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                0);
+
+        // all other counter values consistent.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                0);
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                5);
+
+        // read bytes should match the sum of requested length for each input 
ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                1424);
+      }
+      // validate stats are getting merged at fs instance level.
+      IOStatistics fsStats = fs.getIOStatistics();
+      // only 1 vectored io call is made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
-              0);
-
-      // all other counter values consistent.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              0);
-      verifyStatisticCounterValue(st,
+              1);
+      // 8 get requests were made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
-              5);
+              8);
 
-      // read bytes should match the sum of requested length for each input 
ranges.
-      verifyStatisticCounterValue(st,
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_BYTES,
-              1424);
+              2848);
     }
   }
 
   @Test
   public void testMultiVectoredReadStatsCollection() throws Exception {
-    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
-    List<FileRange> ranges1 = getConsecutiveRanges();
-    List<FileRange> ranges2 = getConsecutiveRanges();
-    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(ranges1, getAllocate());
-      in.readVectored(ranges2, getAllocate());
-      validateVectoredReadResult(ranges1, DATASET);
-      validateVectoredReadResult(ranges2, DATASET);
-      returnBuffersToPoolPostRead(ranges1, getPool());
-      returnBuffersToPoolPostRead(ranges2, getPool());
-
-      // audit the io statistics for this stream
-      IOStatistics st = in.getIOStatistics();
-
-      // 2 vectored io calls are made above.
-      verifyStatisticCounterValue(st,
+    try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+      List<FileRange> ranges1 = getConsecutiveRanges();
+      List<FileRange> ranges2 = getConsecutiveRanges();
+      FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+      CompletableFuture<FSDataInputStream> builder =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
+      try (FSDataInputStream in = builder.get()) {
+        in.readVectored(ranges1, getAllocate());
+        in.readVectored(ranges2, getAllocate());
+        validateVectoredReadResult(ranges1, DATASET);
+        validateVectoredReadResult(ranges2, DATASET);
+        returnBuffersToPoolPostRead(ranges1, getPool());
+        returnBuffersToPoolPostRead(ranges2, getPool());
+
+        // audit the io statistics for this stream
+        IOStatistics st = in.getIOStatistics();
+
+        // 2 vectored io calls are made above.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                2);
+
+        // 2 vectored io operation is being called with 2 input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+                4);
+
+        // 2 ranges are getting merged in 1 during both vectored io operation.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+                2);
+
+        // number of bytes discarded will be 0 as the ranges are consecutive.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                0);
+        // only 2 http get request will be made because ranges in both range 
list will be merged
+        // to 1 because they are consecutive.
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                2);
+        // read bytes should match the sum of requested length for each input 
ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                2000);
+      }
+      IOStatistics fsStats = fs.getIOStatistics();
+      // 2 vectored io calls are made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               2);
-
-      // 2 vectored io operation is being called with 2 input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
-              4);
-
-      // 2 ranges are getting merged in 1 during both vectored io operation.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
-              2);
-
-      // number of bytes discarded will be 0 as the ranges are consecutive.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              0);
-      // only 2 http get request will be made because ranges in both range 
list will be merged
-      // to 1 because they are consecutive.
-      verifyStatisticCounterValue(st,
+      // 2 get requests were made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
               2);
-      // read bytes should match the sum of requested length for each input 
ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_BYTES,
-              2000);
     }
   }
 
-  private FileSystem getTestFileSystemWithReadAheadDisabled() throws 
IOException {
+  private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws 
IOException {
     Configuration conf = getFileSystem().getConf();
     // also resetting the min seek and max size values is important
     // as this same test suite has test which overrides these params.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to