This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 21cf507db37 HADOOP-17450. Add Public IOStatistics API -missed backport (#5590) 21cf507db37 is described below commit 21cf507db371419bfd8d064771baf3c3e3b45580 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Tue Apr 25 15:02:56 2023 +0100 HADOOP-17450. Add Public IOStatistics API -missed backport (#5590) This cherrypicks SemaphoredDelegatingExecutor HADOOP-17450 changes from trunk somehow they didn't get into the main IOStatistics backport to branch-3.3 --- .../hadoop/util/SemaphoredDelegatingExecutor.java | 41 ++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index 45b9a98c68a..c4c11e57b37 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -22,6 +22,8 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Forwarding import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import java.util.Collection; import java.util.List; @@ -33,6 +35,10 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; + /** * This ExecutorService blocks the submission of new tasks when its queue is * already full by using a semaphore. Task submissions require permits, task @@ -53,20 +59,39 @@ public class SemaphoredDelegatingExecutor extends private final Semaphore queueingPermits; private final ExecutorService executorDelegatee; private final int permitCount; + private final DurationTrackerFactory trackerFactory; /** * Instantiate. * @param executorDelegatee Executor to delegate to * @param permitCount number of permits into the queue permitted * @param fair should the semaphore be "fair" + * @param trackerFactory duration tracker factory. */ public SemaphoredDelegatingExecutor( ExecutorService executorDelegatee, int permitCount, - boolean fair) { + boolean fair, + DurationTrackerFactory trackerFactory) { this.permitCount = permitCount; queueingPermits = new Semaphore(permitCount, fair); - this.executorDelegatee = executorDelegatee; + this.executorDelegatee = requireNonNull(executorDelegatee); + this.trackerFactory = trackerFactory != null + ? trackerFactory + : stubDurationTrackerFactory(); + } + + /** + * Instantiate without collecting executor aquisition duration information. + * @param executorDelegatee Executor to delegate to + * @param permitCount number of permits into the queue permitted + * @param fair should the semaphore be "fair" + */ + public SemaphoredDelegatingExecutor( + ExecutorService executorDelegatee, + int permitCount, + boolean fair) { + this(executorDelegatee, permitCount, fair, null); } @Override @@ -102,7 +127,8 @@ public class SemaphoredDelegatingExecutor extends @Override public <T> Future<T> submit(Callable<T> task) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -113,7 +139,8 @@ public class SemaphoredDelegatingExecutor extends @Override public <T> Future<T> submit(Runnable task, T result) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -124,7 +151,8 @@ public class SemaphoredDelegatingExecutor extends @Override public Future<?> submit(Runnable task) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -135,7 +163,8 @@ public class SemaphoredDelegatingExecutor extends @Override public void execute(Runnable command) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org