This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5c3f9aa HDDS-5355. In ContainerStateMachine, share the executor
threads between the containers. (#2350)
5c3f9aa is described below
commit 5c3f9aa46df711d1db84a89b7aa89e4de49344d1
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Jun 22 17:54:00 2021 +0800
HDDS-5355. In ContainerStateMachine, share the executor threads between the
containers. (#2350)
---
.../server/ratis/ContainerStateMachine.java | 74 ++++++++++++----------
1 file changed, 40 insertions(+), 34 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 424c08a..1184f42 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -27,12 +27,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsUtils;
@@ -83,6 +85,8 @@ import
org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.ratis.util.TaskQueue;
+import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,7 +141,8 @@ public class ContainerStateMachine extends BaseStateMachine
{
// keeps track of the containers created per pipeline
private final Map<Long, Long> container2BCSIDMap;
- private final ExecutorService[] executors;
+ private final ConcurrentMap<Long, TaskQueue> containerTaskQueues;
+ private final ExecutorService executor;
private final List<ThreadPoolExecutor> chunkExecutors;
private final Map<Long, Long> applyTransactionCompletionMap;
private final Cache<Long, ByteString> stateMachineDataCache;
@@ -186,15 +191,9 @@ public class ContainerStateMachine extends
BaseStateMachine {
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
- this.executors = new ExecutorService[numContainerOpExecutors];
- for (int i = 0; i < numContainerOpExecutors; i++) {
- final int index = i;
- this.executors[index] = Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r);
- t.setName("RatisApplyTransactionExecutor " + index);
- return t;
- });
- }
+
+ this.executor = Executors.newFixedThreadPool(numContainerOpExecutors);
+ this.containerTaskQueues = new ConcurrentHashMap<>();
}
@Override
@@ -409,12 +408,6 @@ public class ContainerStateMachine extends
BaseStateMachine {
return dispatchCommand(requestProto, context);
}
- private ExecutorService getCommandExecutor(
- ContainerCommandRequestProto requestProto) {
- int executorId = (int)(requestProto.getContainerID() % executors.length);
- return executors[executorId];
- }
-
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
@@ -717,6 +710,24 @@ public class ContainerStateMachine extends
BaseStateMachine {
updateLastApplied();
}
+ private CompletableFuture<ContainerCommandResponseProto> submitTask(
+ ContainerCommandRequestProto request, DispatcherContext.Builder context,
+ Consumer<Exception> exceptionHandler) {
+ final long containerId = request.getContainerID();
+ final TaskQueue queue = containerTaskQueues.computeIfAbsent(
+ containerId, id -> new TaskQueue("container" + id));
+ final CheckedSupplier<ContainerCommandResponseProto, Exception> task
+ = () -> {
+ try {
+ return runCommand(request, context.build());
+ } catch (Exception e) {
+ exceptionHandler.accept(e);
+ throw e;
+ }
+ };
+ return queue.submit(task, executor);
+ }
+
/*
* ApplyTransaction calls in Ratis are sequential.
*/
@@ -753,22 +764,19 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
+ final Consumer<Exception> exceptionHandler = e -> {
+ LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
+ + "{} exception {}", gid, requestProto.getCmdType(),
+ index, e);
+ stateMachineHealthy.compareAndSet(true, false);
+ metrics.incNumApplyTransactionsFails();
+ applyTransactionFuture.completeExceptionally(e);
+ };
+
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
- CompletableFuture<ContainerCommandResponseProto> future =
- CompletableFuture.supplyAsync(() -> {
- try {
- return runCommand(requestProto, builder.build());
- } catch (Exception e) {
- LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
- + "{} exception {}", gid, requestProto.getCmdType(),
- index, e);
- stateMachineHealthy.compareAndSet(true, false);
- metrics.incNumApplyTransactionsFails();
- applyTransactionFuture.completeExceptionally(e);
- throw e;
- }
- }, getCommandExecutor(requestProto));
+ final CompletableFuture<ContainerCommandResponseProto> future
+ = submitTask(requestProto, builder, exceptionHandler);
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER
&& trx.getStateMachineContext() != null) {
@@ -899,11 +907,9 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
@Override
- public void close() throws IOException {
+ public void close() {
evictStateMachineCache();
- for (ExecutorService executor : executors) {
- executor.shutdown();
- }
+ executor.shutdown();
metrics.unRegister();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]