This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1c2c6785a0977f005b555bf222c84b5690391e0f
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Fri Sep 9 03:32:20 2022 -0700

    HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for 
submitting work (#4796)
    
    
    Contributed by Viraj Jasani
---
 .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 17 +++++++++++++----
 .../hadoop/fs/s3a/ITestS3APrefetchingInputStream.java   |  9 +++++++++
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index dd9f3368c59..29cd158641f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -786,9 +786,11 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
         DEFAULT_KEEPALIVE_TIME, 0);
     int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 
0;
 
+    int activeTasksForBoundedThreadPool = maxThreads;
+    int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + 
numPrefetchThreads;
     boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        maxThreads,
-        maxThreads + totalTasks + numPrefetchThreads,
+        activeTasksForBoundedThreadPool,
+        waitingTasksForBoundedThreadPool,
         keepAliveTime, TimeUnit.SECONDS,
         name + "-bounded");
     unboundedThreadPool = new ThreadPoolExecutor(
@@ -800,8 +802,15 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
     unboundedThreadPool.allowCoreThreadTimeOut(true);
     executorCapacity = intOption(conf,
         EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
-    if (this.prefetchEnabled) {
-      this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
+    if (prefetchEnabled) {
+      final S3AInputStreamStatistics s3AInputStreamStatistics =
+          statisticsContext.newInputStreamStatistics();
+      futurePool = new ExecutorServiceFuturePool(
+          new SemaphoredDelegatingExecutor(
+              boundedThreadPool,
+              activeTasksForBoundedThreadPool + 
waitingTasksForBoundedThreadPool,
+              true,
+              s3AInputStreamStatistics));
     }
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index 36d049cedf1..24f74b3a021 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -130,6 +132,8 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
     }
     // Verify that once stream is closed, all memory is freed
     verifyStatisticGaugeValue(ioStats, 
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+    assertThatStatisticMaximum(ioStats,
+        StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + 
SUFFIX_MAX).isGreaterThan(0);
   }
 
   @Test
@@ -159,6 +163,8 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
     }
     verifyStatisticGaugeValue(ioStats, 
StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
     verifyStatisticGaugeValue(ioStats, 
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+    assertThatStatisticMaximum(ioStats,
+        StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + 
SUFFIX_MAX).isGreaterThan(0);
   }
 
   @Test
@@ -183,6 +189,9 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
       verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
       // The buffer pool is not used
       verifyStatisticGaugeValue(ioStats, 
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+      // no prefetch ops, so no action_executor_acquired
+      assertThatStatisticMaximum(ioStats,
+          StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + 
SUFFIX_MAX).isEqualTo(-1);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to