This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f07be3bec28ebb5b2e4ec7e9c3851ff108ba3e82 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed Sep 21 11:52:41 2022 -0700 HADOOP-18455. S3A prefetching executor should be closed (#4879) follow-on patch to HADOOP-18186. Contributed by: Viraj Jasani --- .../fs/impl/prefetch/ExecutorServiceFuturePool.java | 20 +++++++++++++++++++- .../hadoop/util/concurrent/HadoopExecutors.java | 5 ++--- .../hadoop-aws/dev-support/findbugs-exclude.xml | 4 ++++ .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 10 ++++------ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java index 9ef50e50d7e..645de280394 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java @@ -22,8 +22,13 @@ package org.apache.hadoop.fs.impl.prefetch; import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.slf4j.Logger; + +import org.apache.hadoop.util.concurrent.HadoopExecutors; + /** * A FuturePool implementation backed by a java.util.concurrent.ExecutorService. * @@ -37,7 +42,8 @@ import java.util.function.Supplier; * */ public class ExecutorServiceFuturePool { - private ExecutorService executor; + + private final ExecutorService executor; public ExecutorServiceFuturePool(ExecutorService executor) { this.executor = executor; @@ -64,6 +70,18 @@ public class ExecutorServiceFuturePool { return (Future<Void>) executor.submit(r::run); } + /** + * Utility to shutdown the {@link ExecutorService} used by this class. Will wait up to a + * certain timeout for the ExecutorService to gracefully shutdown. + * + * @param logger Logger + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + */ + public void shutdown(Logger logger, long timeout, TimeUnit unit) { + HadoopExecutors.shutdown(executor, logger, timeout, unit); + } + public String toString() { return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java index 0bbceb59c31..6e2838bfe9c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java @@ -115,9 +115,8 @@ public final class HadoopExecutors { try { executorService.shutdown(); - logger.debug( - "Gracefully shutting down executor service. Waiting max {} {}", - timeout, unit); + logger.debug("Gracefully shutting down executor service {}. Waiting max {} {}", + executorService, timeout, unit); if (!executorService.awaitTermination(timeout, unit)) { logger.debug( "Executor service has not shutdown yet. Forcing. " diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index e823840fd71..b8e419f3c88 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -59,6 +59,10 @@ <Method name="openFileWithOptions"/> <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/> </Match> + <Match> + <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem"/> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> <Match> <Class name="org.apache.hadoop.fs.s3a.s3guard.S3GuardTool$BucketInfo"/> <Method name="run"/> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 29cd158641f..bdb444f8f0c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -653,17 +653,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); stopAllServices(); - if (this.futurePool != null) { - this.futurePool = null; - } throw translateException("initializing ", new Path(name), e); } catch (IOException | RuntimeException e) { // other exceptions: stop the services. cleanupWithLogger(LOG, span); stopAllServices(); - if (this.futurePool != null) { - this.futurePool = null; - } throw e; } } @@ -4056,6 +4050,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, HadoopExecutors.shutdown(unboundedThreadPool, LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); unboundedThreadPool = null; + if (futurePool != null) { + futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + futurePool = null; + } // other services are shutdown. cleanupWithLogger(LOG, instrumentation, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org