This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b75b7e4e HDDS-10383. Introduce a Provider for client-side thread 
resources passing (#6222)
f0b75b7e4e is described below

commit f0b75b7e4ee93e89f9e4fc96cb30d59f78746eb5
Author: XiChen <[email protected]>
AuthorDate: Fri Feb 23 02:04:09 2024 +0800

    HDDS-10383. Introduce a Provider for client-side thread resources passing 
(#6222)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 15 +++++++
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  4 +-
 .../hdds/scm/storage/ECBlockOutputStream.java      |  7 ++-
 .../hdds/scm/storage/RatisBlockOutputStream.java   | 11 +++--
 .../storage/TestBlockOutputStreamCorrectness.java  |  5 ++-
 .../ECReconstructionCoordinator.java               | 48 +++++++++++++--------
 .../ozone/client/io/BlockOutputStreamEntry.java    | 19 +++++++-
 .../client/io/BlockOutputStreamEntryPool.java      |  9 ++++
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  3 +-
 .../client/io/ECBlockOutputStreamEntryPool.java    |  3 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 12 ++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 50 +++++++++++-----------
 12 files changed, 131 insertions(+), 55 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 44af34cb91..65e4665297 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -201,6 +201,13 @@ public class OzoneClientConfig {
   // 3 concurrent stripe read should be enough.
   private int ecReconstructStripeReadPoolLimit = 10 * 3;
 
+  @Config(key = "ec.reconstruct.stripe.write.pool.limit",
+      defaultValue = "30",
+      description = "Thread pool max size for parallelly write" +
+          " available ec chunks to reconstruct the whole stripe.",
+      tags = ConfigTag.CLIENT)
+  private int ecReconstructStripeWritePoolLimit = 10 * 3;
+
   @Config(key = "checksum.combine.mode",
       defaultValue = "COMPOSITE_CRC",
       description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
@@ -387,6 +394,14 @@ public class OzoneClientConfig {
     return ecReconstructStripeReadPoolLimit;
   }
 
+  public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
+    this.ecReconstructStripeWritePoolLimit = poolLimit;
+  }
+
+  public int getEcReconstructStripeWritePoolLimit() {
+    return ecReconstructStripeWritePoolLimit;
+  }
+
   public void setFsDefaultBucketLayout(String bucketLayout) {
     if (!bucketLayout.isEmpty()) {
       this.fsDefaultBucketLayout = bucketLayout;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index bbc4616695..5ff5da6098 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -145,7 +146,8 @@ public class BlockOutputStream extends OutputStream {
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+      Supplier<ExecutorService> blockOutputStreamResourceProvider
   ) throws IOException {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 0abc2274bf..adecc3e4c1 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -44,6 +44,8 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
@@ -75,10 +77,11 @@ public class ECBlockOutputStream extends BlockOutputStream {
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+      Supplier<ExecutorService> executorServiceSupplier
   ) throws IOException {
     super(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
+        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, 
executorServiceSupplier);
     // In EC stream, there will be only one node in pipeline.
     this.datanodeDetails = pipeline.getClosestNode();
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index b52fc2af91..6a2758d364 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
@@ -65,8 +67,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
   /**
    * Creates a new BlockOutputStream.
    *
-   * @param blockID              block ID
-   * @param bufferPool           pool of buffers
+   * @param blockID    block ID
+   * @param bufferPool pool of buffers
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   public RatisBlockOutputStream(
@@ -76,10 +78,11 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+      Supplier<ExecutorService> blockOutputStreamResourceProvider
   ) throws IOException {
     super(blockID, xceiverClientManager, pipeline,
-        bufferPool, config, token, clientMetrics, streamBufferArgs);
+        bufferPool, config, token, clientMetrics, streamBufferArgs, 
blockOutputStreamResourceProvider);
     this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 9b061f5392..d06c9cf684 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -47,6 +47,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -108,7 +109,9 @@ class TestBlockOutputStreamCorrectness {
         bufferPool,
         config,
         null,
-        ContainerClientMetrics.acquire(), streamBufferArgs);
+        ContainerClientMetrics.acquire(),
+        streamBufferArgs,
+        () -> newFixedThreadPool(10));
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 234439a00c..a45c158448 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +71,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -101,12 +101,15 @@ public class ECReconstructionCoordinator implements 
Closeable {
 
   private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
 
+  // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer 
thread pool is used.
+  private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
+
   private final ECContainerOperationClient containerOperationClient;
 
   private final ByteBufferPool byteBufferPool;
 
-  private final ExecutorService ecReconstructExecutor;
-
+  private final ExecutorService ecReconstructReadExecutor;
+  private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
   private final BlockInputStreamFactory blockInputStreamFactory;
   private final TokenHelper tokenHelper;
   private final ContainerClientMetrics clientMetrics;
@@ -123,20 +126,18 @@ public class ECReconstructionCoordinator implements 
Closeable {
     this.containerOperationClient = new ECContainerOperationClient(conf,
         certificateClient);
     this.byteBufferPool = new ElasticByteBufferPool();
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
-        .build();
     ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
-    this.ecReconstructExecutor =
-        new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
-            ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
-            60,
-            TimeUnit.SECONDS,
-            new SynchronousQueue<>(),
-            threadFactory,
-            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.ecReconstructReadExecutor = createThreadPoolExecutor(
+        EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+        ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
+        threadNamePrefix + "ec-reconstruct-reader-TID-%d");
+    this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
+        () -> createThreadPoolExecutor(
+            EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
+            ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
+            threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
-        .getInstance(byteBufferPool, () -> ecReconstructExecutor);
+        .getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
     tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
     this.clientMetrics = ContainerClientMetrics.acquire();
     this.metrics = metrics;
@@ -232,7 +233,7 @@ public class ECReconstructionCoordinator implements 
Closeable {
         containerOperationClient.singleNodePipeline(datanodeDetails,
             repConfig, replicaIndex),
         BufferPool.empty(), ozoneClientConfig,
-        blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
+        blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, 
ecReconstructWriteExecutor);
   }
 
   @VisibleForTesting
@@ -272,7 +273,7 @@ public class ECReconstructionCoordinator implements 
Closeable {
         repConfig, blockLocationInfo, true,
         this.containerOperationClient.getXceiverClientManager(), null,
         this.blockInputStreamFactory, byteBufferPool,
-        this.ecReconstructExecutor)) {
+        this.ecReconstructReadExecutor)) {
 
       ECBlockOutputStream[] targetBlockStreams =
           new ECBlockOutputStream[toReconstructIndexes.size()];
@@ -457,6 +458,9 @@ public class ECReconstructionCoordinator implements 
Closeable {
     if (containerOperationClient != null) {
       containerOperationClient.close();
     }
+    if (ecReconstructWriteExecutor.isInitialized()) {
+      ecReconstructWriteExecutor.get().shutdownNow();
+    }
   }
 
   private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
@@ -590,4 +594,12 @@ public class ECReconstructionCoordinator implements 
Closeable {
         .map(StateContext::getTermOfLeaderSCM)
         .orElse(OptionalLong.empty());
   }
+
+  private static ExecutorService createThreadPoolExecutor(
+      int corePoolSize, int maximumPoolSize, String threadNameFormat) {
+    return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+        60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+        new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index c0221d07a5..ba3850ff39 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -64,6 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream {
   private final BufferPool bufferPool;
   private final ContainerClientMetrics clientMetrics;
   private final StreamBufferArgs streamBufferArgs;
+  private final Supplier<ExecutorService> executorServiceSupplier;
 
   BlockOutputStreamEntry(Builder b) {
     this.config = b.config;
@@ -78,6 +81,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     this.bufferPool = b.bufferPool;
     this.clientMetrics = b.clientMetrics;
     this.streamBufferArgs = b.streamBufferArgs;
+    this.executorServiceSupplier = b.executorServiceSupplier;
   }
 
   @Override
@@ -104,13 +108,18 @@ public class BlockOutputStreamEntry extends OutputStream {
    */
   void createOutputStream() throws IOException {
     outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
+        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
+        executorServiceSupplier);
   }
 
   ContainerClientMetrics getClientMetrics() {
     return clientMetrics;
   }
 
+  Supplier<ExecutorService> getExecutorServiceSupplier() {
+    return executorServiceSupplier;
+  }
+
   StreamBufferArgs getStreamBufferArgs() {
     return streamBufferArgs;
   }
@@ -357,6 +366,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     private OzoneClientConfig config;
     private ContainerClientMetrics clientMetrics;
     private StreamBufferArgs streamBufferArgs;
+    private Supplier<ExecutorService> executorServiceSupplier;
 
     public Pipeline getPipeline() {
       return pipeline;
@@ -406,15 +416,22 @@ public class BlockOutputStreamEntry extends OutputStream {
       this.token = bToken;
       return this;
     }
+
     public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
       this.clientMetrics = clientMetrics;
       return this;
     }
+
     public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
       this.streamBufferArgs = streamBufferArgs;
       return this;
     }
 
+    public Builder setExecutorServiceSupplier(Supplier<ExecutorService> 
executorServiceSupplier) {
+      this.executorServiceSupplier = executorServiceSupplier;
+      return this;
+    }
+
     public BlockOutputStreamEntry build() {
       return new BlockOutputStreamEntry(this);
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 4d6026f925..51383e8717 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
@@ -83,6 +85,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   private final ExcludeList excludeList;
   private final ContainerClientMetrics clientMetrics;
   private final StreamBufferArgs streamBufferArgs;
+  private final Supplier<ExecutorService> executorServiceSupplier;
 
   public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
     this.config = b.getClientConfig();
@@ -109,6 +112,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
             ByteStringConversion
                 
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
     this.clientMetrics = b.getClientMetrics();
+    this.executorServiceSupplier = b.getExecutorServiceSupplier();
   }
 
   ExcludeList createExcludeList() {
@@ -159,6 +163,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
             .setToken(subKeyInfo.getToken())
             .setClientMetrics(clientMetrics)
             .setStreamBufferArgs(streamBufferArgs)
+            .setExecutorServiceSupplier(executorServiceSupplier)
             .build();
   }
 
@@ -229,6 +234,10 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     return streamBufferArgs;
   }
 
+  public Supplier<ExecutorService> getExecutorServiceSupplier() {
+    return executorServiceSupplier;
+  }
+
   /**
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 7f6ce87d60..241754a57f 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -85,7 +85,8 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
         streams[i] =
             new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
                 createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 
1),
-                getBufferPool(), getConf(), getToken(), getClientMetrics(), 
getStreamBufferArgs());
+                getBufferPool(), getConf(), getToken(), getClientMetrics(), 
getStreamBufferArgs(),
+                getExecutorServiceSupplier());
       }
       blockOutputStreams = streams;
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index e278097a49..6eb9aed0d3 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -48,7 +48,8 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
             .setBufferPool(getBufferPool())
             .setToken(subKeyInfo.getToken())
             .setClientMetrics(getClientMetrics())
-            .setStreamBufferArgs(getStreamBufferArgs());
+            .setStreamBufferArgs(getStreamBufferArgs())
+            .setExecutorServiceSupplier(getExecutorServiceSupplier());
     return b.build();
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 9ea17cf8b2..d9e735cd7c 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -24,7 +24,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -586,6 +588,7 @@ public class KeyOutputStream extends OutputStream
     private ContainerClientMetrics clientMetrics;
     private boolean atomicKeyCreation = false;
     private StreamBufferArgs streamBufferArgs;
+    private Supplier<ExecutorService> executorServiceSupplier;
 
     public String getMultipartUploadID() {
       return multipartUploadID;
@@ -699,6 +702,15 @@ public class KeyOutputStream extends OutputStream
       return atomicKeyCreation;
     }
 
+    public Builder setExecutorServiceSupplier(Supplier<ExecutorService> 
executorServiceSupplier) {
+      this.executorServiceSupplier = executorServiceSupplier;
+      return this;
+    }
+
+    public Supplier<ExecutorService> getExecutorServiceSupplier() {
+      return executorServiceSupplier;
+    }
+
     public KeyOutputStream build() {
       return new KeyOutputStream(this);
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 3e71262040..74b22e7ca4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -145,6 +145,7 @@ import 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,6 +196,9 @@ public class RpcClient implements ClientProtocol {
   // for reconstruction.
   private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
 
+  // TODO: Adjusts to the appropriate value when the writeThreadPool is used.
+  private static final int WRITE_POOL_MIN_SIZE = 0;
+
   private final ConfigurationSource conf;
   private final OzoneManagerClientProtocol ozoneManagerClient;
   private final XceiverClientFactory xceiverClientManager;
@@ -213,8 +217,9 @@ public class RpcClient implements ClientProtocol {
   private final ByteBufferPool byteBufferPool;
   private final BlockInputStreamFactory blockInputStreamFactory;
   private final OzoneManagerVersion omVersion;
-  private volatile ExecutorService ecReconstructExecutor;
+  private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
   private final ContainerClientMetrics clientMetrics;
+  private final MemoizedSupplier<ExecutorService> writeExecutor;
   private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
 
   /**
@@ -237,6 +242,11 @@ public class RpcClient implements ClientProtocol {
     this.groupRights = aclConfig.getGroupDefaultRights();
 
     this.clientConfig = conf.getObject(OzoneClientConfig.class);
+    this.ecReconstructExecutor = MemoizedSupplier.valueOf(() -> 
createThreadPoolExecutor(
+        EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, 
clientConfig.getEcReconstructStripeReadPoolLimit(),
+        "ec-reconstruct-reader-TID-%d"));
+    this.writeExecutor = MemoizedSupplier.valueOf(() -> 
createThreadPoolExecutor(
+        WRITE_POOL_MIN_SIZE, Integer.MAX_VALUE, "client-write-TID-%d"));
 
     OmTransport omTransport = createOmTransport(omServiceId);
     OzoneManagerProtocolClientSideTranslatorPB
@@ -311,7 +321,7 @@ public class RpcClient implements ClientProtocol {
         }).build();
     this.byteBufferPool = new ElasticByteBufferPool();
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
-        .getInstance(byteBufferPool, this::getECReconstructExecutor);
+        .getInstance(byteBufferPool, ecReconstructExecutor);
     this.clientMetrics = ContainerClientMetrics.acquire();
   }
 
@@ -1777,9 +1787,11 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   public void close() throws IOException {
-    if (ecReconstructExecutor != null) {
-      ecReconstructExecutor.shutdownNow();
-      ecReconstructExecutor = null;
+    if (ecReconstructExecutor.isInitialized()) {
+      ecReconstructExecutor.get().shutdownNow();
+    }
+    if (writeExecutor.isInitialized()) {
+      writeExecutor.get().shutdownNow();
     }
     IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
     keyProviderCache.invalidateAll();
@@ -2400,6 +2412,7 @@ public class RpcClient implements ClientProtocol {
         .setConfig(clientConfig)
         .setAtomicKeyCreation(isS3GRequest.get())
         .setClientMetrics(clientMetrics)
+        .setExecutorServiceSupplier(writeExecutor)
         .setStreamBufferArgs(streamBufferArgs);
   }
 
@@ -2521,26 +2534,11 @@ public class RpcClient implements ClientProtocol {
     ozoneManagerClient.setTimes(builder.build(), mtime, atime);
   }
 
-  public ExecutorService getECReconstructExecutor() {
-    // local ref to a volatile to ensure access
-    // to a completed initialized object
-    ExecutorService executor = ecReconstructExecutor;
-    if (executor == null) {
-      synchronized (this) {
-        executor = ecReconstructExecutor;
-        if (executor == null) {
-          ecReconstructExecutor = new ThreadPoolExecutor(
-              EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
-              clientConfig.getEcReconstructStripeReadPoolLimit(),
-              60, TimeUnit.SECONDS, new SynchronousQueue<>(),
-              new ThreadFactoryBuilder()
-                  .setNameFormat("ec-reconstruct-reader-TID-%d")
-                  .build(),
-              new ThreadPoolExecutor.CallerRunsPolicy());
-          executor = ecReconstructExecutor;
-        }
-      }
-    }
-    return executor;
+  private static ExecutorService createThreadPoolExecutor(
+       int corePoolSize, int maximumPoolSize, String threadNameFormat) {
+    return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+            60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+               new 
ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
+               new ThreadPoolExecutor.CallerRunsPolicy());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to