This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f0b75b7e4e HDDS-10383. Introduce a Provider for client-side thread
resources passing (#6222)
f0b75b7e4e is described below
commit f0b75b7e4ee93e89f9e4fc96cb30d59f78746eb5
Author: XiChen <[email protected]>
AuthorDate: Fri Feb 23 02:04:09 2024 +0800
HDDS-10383. Introduce a Provider for client-side thread resources passing
(#6222)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 15 +++++++
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 4 +-
.../hdds/scm/storage/ECBlockOutputStream.java | 7 ++-
.../hdds/scm/storage/RatisBlockOutputStream.java | 11 +++--
.../storage/TestBlockOutputStreamCorrectness.java | 5 ++-
.../ECReconstructionCoordinator.java | 48 +++++++++++++--------
.../ozone/client/io/BlockOutputStreamEntry.java | 19 +++++++-
.../client/io/BlockOutputStreamEntryPool.java | 9 ++++
.../ozone/client/io/ECBlockOutputStreamEntry.java | 3 +-
.../client/io/ECBlockOutputStreamEntryPool.java | 3 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 12 ++++++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 50 +++++++++++-----------
12 files changed, 131 insertions(+), 55 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 44af34cb91..65e4665297 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -201,6 +201,13 @@ public class OzoneClientConfig {
// 3 concurrent stripe read should be enough.
private int ecReconstructStripeReadPoolLimit = 10 * 3;
+ @Config(key = "ec.reconstruct.stripe.write.pool.limit",
+ defaultValue = "30",
+ description = "Thread pool max size for parallelly write" +
+ " available ec chunks to reconstruct the whole stripe.",
+ tags = ConfigTag.CLIENT)
+ private int ecReconstructStripeWritePoolLimit = 10 * 3;
+
@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
@@ -387,6 +394,14 @@ public class OzoneClientConfig {
return ecReconstructStripeReadPoolLimit;
}
+ public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
+ this.ecReconstructStripeWritePoolLimit = poolLimit;
+ }
+
+ public int getEcReconstructStripeWritePoolLimit() {
+ return ecReconstructStripeWritePoolLimit;
+ }
+
public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index bbc4616695..5ff5da6098 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -145,7 +146,8 @@ public class BlockOutputStream extends OutputStream {
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+ Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 0abc2274bf..adecc3e4c1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -44,6 +44,8 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
@@ -75,10 +77,11 @@ public class ECBlockOutputStream extends BlockOutputStream {
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+ Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
+ pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index b52fc2af91..6a2758d364 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
/**
* An {@link OutputStream} used by the REST service in combination with the
@@ -65,8 +67,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
/**
* Creates a new BlockOutputStream.
*
- * @param blockID block ID
- * @param bufferPool pool of buffers
+ * @param blockID block ID
+ * @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
@@ -76,10 +78,11 @@ public class RatisBlockOutputStream extends
BlockOutputStream
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
+ Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
- bufferPool, config, token, clientMetrics, streamBufferArgs);
+ bufferPool, config, token, clientMetrics, streamBufferArgs,
blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 9b061f5392..d06c9cf684 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -47,6 +47,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -108,7 +109,9 @@ class TestBlockOutputStreamCorrectness {
bufferPool,
config,
null,
- ContainerClientMetrics.acquire(), streamBufferArgs);
+ ContainerClientMetrics.acquire(),
+ streamBufferArgs,
+ () -> newFixedThreadPool(10));
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 234439a00c..a45c158448 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
@@ -50,6 +50,7 @@ import
org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -101,12 +101,15 @@ public class ECReconstructionCoordinator implements
Closeable {
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+ // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer
thread pool is used.
+ private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
+
private final ECContainerOperationClient containerOperationClient;
private final ByteBufferPool byteBufferPool;
- private final ExecutorService ecReconstructExecutor;
-
+ private final ExecutorService ecReconstructReadExecutor;
+ private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
@@ -123,20 +126,18 @@ public class ECReconstructionCoordinator implements
Closeable {
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
- .build();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
- this.ecReconstructExecutor =
- new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
- ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
- 60,
- TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- threadFactory,
- new ThreadPoolExecutor.CallerRunsPolicy());
+ this.ecReconstructReadExecutor = createThreadPoolExecutor(
+ EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+ ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
+ threadNamePrefix + "ec-reconstruct-reader-TID-%d");
+ this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
+ () -> createThreadPoolExecutor(
+ EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
+ ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
+ threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
- .getInstance(byteBufferPool, () -> ecReconstructExecutor);
+ .getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
@@ -232,7 +233,7 @@ public class ECReconstructionCoordinator implements
Closeable {
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), ozoneClientConfig,
- blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
+ blockLocationInfo.getToken(), clientMetrics, streamBufferArgs,
ecReconstructWriteExecutor);
}
@VisibleForTesting
@@ -272,7 +273,7 @@ public class ECReconstructionCoordinator implements
Closeable {
repConfig, blockLocationInfo, true,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
- this.ecReconstructExecutor)) {
+ this.ecReconstructReadExecutor)) {
ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
@@ -457,6 +458,9 @@ public class ECReconstructionCoordinator implements
Closeable {
if (containerOperationClient != null) {
containerOperationClient.close();
}
+ if (ecReconstructWriteExecutor.isInitialized()) {
+ ecReconstructWriteExecutor.get().shutdownNow();
+ }
}
private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
@@ -590,4 +594,12 @@ public class ECReconstructionCoordinator implements
Closeable {
.map(StateContext::getTermOfLeaderSCM)
.orElse(OptionalLong.empty());
}
+
+ private static ExecutorService createThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize, String threadNameFormat) {
+ return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+ 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index c0221d07a5..ba3850ff39 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.BlockID;
@@ -64,6 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private final BufferPool bufferPool;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
+ private final Supplier<ExecutorService> executorServiceSupplier;
BlockOutputStreamEntry(Builder b) {
this.config = b.config;
@@ -78,6 +81,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.bufferPool = b.bufferPool;
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
+ this.executorServiceSupplier = b.executorServiceSupplier;
}
@Override
@@ -104,13 +108,18 @@ public class BlockOutputStreamEntry extends OutputStream {
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
+ pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
+ executorServiceSupplier);
}
ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}
+ Supplier<ExecutorService> getExecutorServiceSupplier() {
+ return executorServiceSupplier;
+ }
+
StreamBufferArgs getStreamBufferArgs() {
return streamBufferArgs;
}
@@ -357,6 +366,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;
+ private Supplier<ExecutorService> executorServiceSupplier;
public Pipeline getPipeline() {
return pipeline;
@@ -406,15 +416,22 @@ public class BlockOutputStreamEntry extends OutputStream {
this.token = bToken;
return this;
}
+
public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
}
+
public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
this.streamBufferArgs = streamBufferArgs;
return this;
}
+ public Builder setExecutorServiceSupplier(Supplier<ExecutorService>
executorServiceSupplier) {
+ this.executorServiceSupplier = executorServiceSupplier;
+ return this;
+ }
+
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(this);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 4d6026f925..51383e8717 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
@@ -83,6 +85,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
+ private final Supplier<ExecutorService> executorServiceSupplier;
public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
this.config = b.getClientConfig();
@@ -109,6 +112,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
ByteStringConversion
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
this.clientMetrics = b.getClientMetrics();
+ this.executorServiceSupplier = b.getExecutorServiceSupplier();
}
ExcludeList createExcludeList() {
@@ -159,6 +163,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
.setToken(subKeyInfo.getToken())
.setClientMetrics(clientMetrics)
.setStreamBufferArgs(streamBufferArgs)
+ .setExecutorServiceSupplier(executorServiceSupplier)
.build();
}
@@ -229,6 +234,10 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
return streamBufferArgs;
}
+ public Supplier<ExecutorService> getExecutorServiceSupplier() {
+ return executorServiceSupplier;
+ }
+
/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 7f6ce87d60..241754a57f 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -85,7 +85,8 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
streams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i +
1),
- getBufferPool(), getConf(), getToken(), getClientMetrics(),
getStreamBufferArgs());
+ getBufferPool(), getConf(), getToken(), getClientMetrics(),
getStreamBufferArgs(),
+ getExecutorServiceSupplier());
}
blockOutputStreams = streams;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index e278097a49..6eb9aed0d3 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -48,7 +48,8 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
.setClientMetrics(getClientMetrics())
- .setStreamBufferArgs(getStreamBufferArgs());
+ .setStreamBufferArgs(getStreamBufferArgs())
+ .setExecutorServiceSupplier(getExecutorServiceSupplier());
return b.build();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 9ea17cf8b2..d9e735cd7c 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -24,7 +24,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSExceptionMessages;
@@ -586,6 +588,7 @@ public class KeyOutputStream extends OutputStream
private ContainerClientMetrics clientMetrics;
private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
+ private Supplier<ExecutorService> executorServiceSupplier;
public String getMultipartUploadID() {
return multipartUploadID;
@@ -699,6 +702,15 @@ public class KeyOutputStream extends OutputStream
return atomicKeyCreation;
}
+ public Builder setExecutorServiceSupplier(Supplier<ExecutorService>
executorServiceSupplier) {
+ this.executorServiceSupplier = executorServiceSupplier;
+ return this;
+ }
+
+ public Supplier<ExecutorService> getExecutorServiceSupplier() {
+ return executorServiceSupplier;
+ }
+
public KeyOutputStream build() {
return new KeyOutputStream(this);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 3e71262040..74b22e7ca4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -145,6 +145,7 @@ import
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,6 +196,9 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+ // TODO: Adjusts to the appropriate value when the writeThreadPool is used.
+ private static final int WRITE_POOL_MIN_SIZE = 0;
+
private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
private final XceiverClientFactory xceiverClientManager;
@@ -213,8 +217,9 @@ public class RpcClient implements ClientProtocol {
private final ByteBufferPool byteBufferPool;
private final BlockInputStreamFactory blockInputStreamFactory;
private final OzoneManagerVersion omVersion;
- private volatile ExecutorService ecReconstructExecutor;
+ private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
private final ContainerClientMetrics clientMetrics;
+ private final MemoizedSupplier<ExecutorService> writeExecutor;
private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
/**
@@ -237,6 +242,11 @@ public class RpcClient implements ClientProtocol {
this.groupRights = aclConfig.getGroupDefaultRights();
this.clientConfig = conf.getObject(OzoneClientConfig.class);
+ this.ecReconstructExecutor = MemoizedSupplier.valueOf(() ->
createThreadPoolExecutor(
+ EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
clientConfig.getEcReconstructStripeReadPoolLimit(),
+ "ec-reconstruct-reader-TID-%d"));
+ this.writeExecutor = MemoizedSupplier.valueOf(() ->
createThreadPoolExecutor(
+ WRITE_POOL_MIN_SIZE, Integer.MAX_VALUE, "client-write-TID-%d"));
OmTransport omTransport = createOmTransport(omServiceId);
OzoneManagerProtocolClientSideTranslatorPB
@@ -311,7 +321,7 @@ public class RpcClient implements ClientProtocol {
}).build();
this.byteBufferPool = new ElasticByteBufferPool();
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
- .getInstance(byteBufferPool, this::getECReconstructExecutor);
+ .getInstance(byteBufferPool, ecReconstructExecutor);
this.clientMetrics = ContainerClientMetrics.acquire();
}
@@ -1777,9 +1787,11 @@ public class RpcClient implements ClientProtocol {
@Override
public void close() throws IOException {
- if (ecReconstructExecutor != null) {
- ecReconstructExecutor.shutdownNow();
- ecReconstructExecutor = null;
+ if (ecReconstructExecutor.isInitialized()) {
+ ecReconstructExecutor.get().shutdownNow();
+ }
+ if (writeExecutor.isInitialized()) {
+ writeExecutor.get().shutdownNow();
}
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
keyProviderCache.invalidateAll();
@@ -2400,6 +2412,7 @@ public class RpcClient implements ClientProtocol {
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
+ .setExecutorServiceSupplier(writeExecutor)
.setStreamBufferArgs(streamBufferArgs);
}
@@ -2521,26 +2534,11 @@ public class RpcClient implements ClientProtocol {
ozoneManagerClient.setTimes(builder.build(), mtime, atime);
}
- public ExecutorService getECReconstructExecutor() {
- // local ref to a volatile to ensure access
- // to a completed initialized object
- ExecutorService executor = ecReconstructExecutor;
- if (executor == null) {
- synchronized (this) {
- executor = ecReconstructExecutor;
- if (executor == null) {
- ecReconstructExecutor = new ThreadPoolExecutor(
- EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
- clientConfig.getEcReconstructStripeReadPoolLimit(),
- 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
- new ThreadFactoryBuilder()
- .setNameFormat("ec-reconstruct-reader-TID-%d")
- .build(),
- new ThreadPoolExecutor.CallerRunsPolicy());
- executor = ecReconstructExecutor;
- }
- }
- }
- return executor;
+ private static ExecutorService createThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize, String threadNameFormat) {
+ return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+ 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new
ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]