adoroszlai commented on code in PR #4713:
URL: https://github.com/apache/ozone/pull/4713#discussion_r1204058780


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -163,6 +176,21 @@ public long getAverageRunTime() {
 
   @Override
   public int getQueuedCount() {
-    return 0;
+    return queuedCount.get();
+  }
+
+  @Override
+  public void stop() {
+    if (executor != null) {
+      try {
+        executor.shutdown();
+        if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
+          executor.shutdownNow();
+        }
+      } catch (InterruptedException ie) {
+        // Ignore, we don't really care about the failure.
+        Thread.currentThread().interrupt();
+      }
+    }
   }

Review Comment:
   The executor is also shutdown in `DatanodeStateMachine`, so I think we can 
omit this.
   
   ```suggestion
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -34,6 +34,9 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;

Review Comment:
   ```suggestion
   import java.util.concurrent.Executor;
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
 
-  private AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private long totalTime;
+  private final ExecutorService executor;

Review Comment:
   ```suggestion
     private final Executor executor;
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -17,6 +17,10 @@
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;

Review Comment:
   ```suggestion
   import java.util.concurrent.Executor;
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
       LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
 
   private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
 
   private long totalTime;
+  private final ExecutorService executor;

Review Comment:
   ```suggestion
     private final Executor executor;
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
 
-  private AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private long totalTime;
+  private final ExecutorService executor;
 
   /**
    * Constructs a closePipelineCommand handler.
    */
-  public ClosePipelineCommandHandler() {
+  public ClosePipelineCommandHandler(ExecutorService executor) {

Review Comment:
   ```suggestion
     public ClosePipelineCommandHandler(Executor executor) {
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
       LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
 
   private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
 
   private long totalTime;
+  private final ExecutorService executor;
 
   /**
    * Constructs a createPipelineCommand handler.
    */
-  public CreatePipelineCommandHandler(ConfigurationSource conf) {
-    this(RatisHelper.newRaftClient(conf));
+  public CreatePipelineCommandHandler(ConfigurationSource conf,
+                                      ExecutorService executor) {

Review Comment:
   ```suggestion
                                         Executor executor) {
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
       LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
 
   private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
 
   private long totalTime;
+  private final ExecutorService executor;
 
   /**
    * Constructs a createPipelineCommand handler.
    */
-  public CreatePipelineCommandHandler(ConfigurationSource conf) {
-    this(RatisHelper.newRaftClient(conf));
+  public CreatePipelineCommandHandler(ConfigurationSource conf,
+                                      ExecutorService executor) {
+    this(RatisHelper.newRaftClient(conf), executor);
   }
 
   CreatePipelineCommandHandler(
-      BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient) {
+      BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+      ExecutorService executor) {

Review Comment:
   ```suggestion
         Executor executor) {
   ```



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java:
##########
@@ -91,11 +94,21 @@ public void testPipelineCreation() throws IOException {
     Mockito.when(writeChanel.isExist(pipelineID.getProtobuf()))
         .thenReturn(false);
 
+    ExecutorService executorService = Executors.newSingleThreadExecutor();

Review Comment:
   Please replace with `MoreExecutors.directExecutor()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to