This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c3f9aa  HDDS-5355. In ContainerStateMachine, share the executor 
threads between the containers. (#2350)
5c3f9aa is described below

commit 5c3f9aa46df711d1db84a89b7aa89e4de49344d1
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Jun 22 17:54:00 2021 +0800

    HDDS-5355. In ContainerStateMachine, share the executor threads between the 
containers. (#2350)
---
 .../server/ratis/ContainerStateMachine.java        | 74 ++++++++++++----------
 1 file changed, 40 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 424c08a..1184f42 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -27,12 +27,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.HddsUtils;
@@ -83,6 +85,8 @@ import 
org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.ratis.util.TaskQueue;
+import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,7 +141,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
   // keeps track of the containers created per pipeline
   private final Map<Long, Long> container2BCSIDMap;
-  private final ExecutorService[] executors;
+  private final ConcurrentMap<Long, TaskQueue> containerTaskQueues;
+  private final ExecutorService executor;
   private final List<ThreadPoolExecutor> chunkExecutors;
   private final Map<Long, Long> applyTransactionCompletionMap;
   private final Cache<Long, ByteString> stateMachineDataCache;
@@ -186,15 +191,9 @@ public class ContainerStateMachine extends 
BaseStateMachine {
             DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
     applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
     stateMachineHealthy = new AtomicBoolean(true);
-    this.executors = new ExecutorService[numContainerOpExecutors];
-    for (int i = 0; i < numContainerOpExecutors; i++) {
-      final int index = i;
-      this.executors[index] = Executors.newSingleThreadExecutor(r -> {
-        Thread t = new Thread(r);
-        t.setName("RatisApplyTransactionExecutor " + index);
-        return t;
-      });
-    }
+
+    this.executor = Executors.newFixedThreadPool(numContainerOpExecutors);
+    this.containerTaskQueues = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -409,12 +408,6 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return dispatchCommand(requestProto, context);
   }
 
-  private ExecutorService getCommandExecutor(
-      ContainerCommandRequestProto requestProto) {
-    int executorId = (int)(requestProto.getContainerID() % executors.length);
-    return executors[executorId];
-  }
-
   private CompletableFuture<Message> handleWriteChunk(
       ContainerCommandRequestProto requestProto, long entryIndex, long term,
       long startTime) {
@@ -717,6 +710,24 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     updateLastApplied();
   }
 
+  private CompletableFuture<ContainerCommandResponseProto> submitTask(
+      ContainerCommandRequestProto request, DispatcherContext.Builder context,
+      Consumer<Exception> exceptionHandler) {
+    final long containerId = request.getContainerID();
+    final TaskQueue queue = containerTaskQueues.computeIfAbsent(
+        containerId, id -> new TaskQueue("container" + id));
+    final CheckedSupplier<ContainerCommandResponseProto, Exception> task
+        = () -> {
+          try {
+            return runCommand(request, context.build());
+          } catch (Exception e) {
+            exceptionHandler.accept(e);
+            throw e;
+          }
+        };
+    return queue.submit(task, executor);
+  }
+
   /*
    * ApplyTransaction calls in Ratis are sequential.
    */
@@ -753,22 +764,19 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       }
       CompletableFuture<Message> applyTransactionFuture =
           new CompletableFuture<>();
+      final Consumer<Exception> exceptionHandler = e -> {
+        LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
+                + "{} exception {}", gid, requestProto.getCmdType(),
+            index, e);
+        stateMachineHealthy.compareAndSet(true, false);
+        metrics.incNumApplyTransactionsFails();
+        applyTransactionFuture.completeExceptionally(e);
+      };
+
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
-      CompletableFuture<ContainerCommandResponseProto> future =
-          CompletableFuture.supplyAsync(() -> {
-            try {
-              return runCommand(requestProto, builder.build());
-            } catch (Exception e) {
-              LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
-                      + "{} exception {}", gid, requestProto.getCmdType(),
-                  index, e);
-              stateMachineHealthy.compareAndSet(true, false);
-              metrics.incNumApplyTransactionsFails();
-              applyTransactionFuture.completeExceptionally(e);
-              throw e;
-            }
-          }, getCommandExecutor(requestProto));
+      final CompletableFuture<ContainerCommandResponseProto> future
+          = submitTask(requestProto, builder, exceptionHandler);
       future.thenApply(r -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER
             && trx.getStateMachineContext() != null) {
@@ -899,11 +907,9 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     evictStateMachineCache();
-    for (ExecutorService executor : executors) {
-      executor.shutdown();
-    }
+    executor.shutdown();
     metrics.unRegister();
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to