This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 21cf507db37 HADOOP-17450. Add Public IOStatistics API -missed backport 
(#5590)
21cf507db37 is described below

commit 21cf507db371419bfd8d064771baf3c3e3b45580
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Tue Apr 25 15:02:56 2023 +0100

    HADOOP-17450. Add Public IOStatistics API -missed backport (#5590)
    
    
    This cherrypicks SemaphoredDelegatingExecutor HADOOP-17450 changes
    from trunk somehow they didn't get into the main IOStatistics backport
    to branch-3.3
---
 .../hadoop/util/SemaphoredDelegatingExecutor.java  | 41 ++++++++++++++++++----
 1 file changed, 35 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
index 45b9a98c68a..c4c11e57b37 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
@@ -22,6 +22,8 @@ import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Forwarding
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 
 import java.util.Collection;
 import java.util.List;
@@ -33,6 +35,10 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
+
 /**
  * This ExecutorService blocks the submission of new tasks when its queue is
  * already full by using a semaphore. Task submissions require permits, task
@@ -53,20 +59,39 @@ public class SemaphoredDelegatingExecutor extends
   private final Semaphore queueingPermits;
   private final ExecutorService executorDelegatee;
   private final int permitCount;
+  private final DurationTrackerFactory trackerFactory;
 
   /**
    * Instantiate.
    * @param executorDelegatee Executor to delegate to
    * @param permitCount number of permits into the queue permitted
    * @param fair should the semaphore be "fair"
+   * @param trackerFactory duration tracker factory.
    */
   public SemaphoredDelegatingExecutor(
       ExecutorService executorDelegatee,
       int permitCount,
-      boolean fair) {
+      boolean fair,
+      DurationTrackerFactory trackerFactory) {
     this.permitCount = permitCount;
     queueingPermits = new Semaphore(permitCount, fair);
-    this.executorDelegatee = executorDelegatee;
+    this.executorDelegatee = requireNonNull(executorDelegatee);
+    this.trackerFactory = trackerFactory != null
+        ? trackerFactory
+        : stubDurationTrackerFactory();
+  }
+
+  /**
+   * Instantiate without collecting executor aquisition duration information.
+   * @param executorDelegatee Executor to delegate to
+   * @param permitCount number of permits into the queue permitted
+   * @param fair should the semaphore be "fair"
+   */
+  public SemaphoredDelegatingExecutor(
+      ExecutorService executorDelegatee,
+      int permitCount,
+      boolean fair) {
+    this(executorDelegatee, permitCount, fair, null);
   }
 
   @Override
@@ -102,7 +127,8 @@ public class SemaphoredDelegatingExecutor extends
 
   @Override
   public <T> Future<T> submit(Callable<T> task) {
-    try {
+    try (DurationTracker ignored =
+             trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
       queueingPermits.acquire();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -113,7 +139,8 @@ public class SemaphoredDelegatingExecutor extends
 
   @Override
   public <T> Future<T> submit(Runnable task, T result) {
-    try {
+    try (DurationTracker ignored =
+             trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
       queueingPermits.acquire();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -124,7 +151,8 @@ public class SemaphoredDelegatingExecutor extends
 
   @Override
   public Future<?> submit(Runnable task) {
-    try {
+    try (DurationTracker ignored =
+             trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
       queueingPermits.acquire();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -135,7 +163,8 @@ public class SemaphoredDelegatingExecutor extends
 
   @Override
   public void execute(Runnable command) {
-    try {
+    try (DurationTracker ignored =
+             trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
       queueingPermits.acquire();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to