This is an automated email from the ASF dual-hosted git repository.

pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cc4e026d59 HDDS-11304. Make up for the missing functionality in 
CommandDispatcher (#7062)
cc4e026d59 is described below

commit cc4e026d59e58273c412de15e0d4e6d5715848a3
Author: jianghuazhu <[email protected]>
AuthorDate: Thu Aug 29 23:48:56 2024 +0800

    HDDS-11304. Make up for the missing functionality in CommandDispatcher 
(#7062)
---
 .../CloseContainerCommandHandler.java              | 15 ++++--
 .../commandhandler/CommandDispatcher.java          | 17 +++----
 .../commandhandler/DeleteBlocksCommandHandler.java |  4 +-
 .../DeleteContainerCommandHandler.java             | 17 +++++--
 .../TestCloseContainerCommandHandler.java          | 27 ++++++++++
 .../TestDeleteContainerCommandHandler.java         | 59 +++++++++++++++++++---
 6 files changed, 113 insertions(+), 26 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 8533f7384d..bc703ac6a5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -18,7 +18,6 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -58,11 +57,11 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
 
   private final AtomicLong invocationCount = new AtomicLong(0);
   private final AtomicInteger queuedCount = new AtomicInteger(0);
-  private final ExecutorService executor;
+  private final ThreadPoolExecutor executor;
   private long totalTime;
 
   /**
-   * Constructs a ContainerReport handler.
+   * Constructs a close container command handler.
    */
   public CloseContainerCommandHandler(
       int threadPoolSize, int queueSize, String threadNamePrefix) {
@@ -220,4 +219,14 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   public int getQueuedCount() {
     return queuedCount.get();
   }
+
+  @Override
+  public int getThreadPoolMaxPoolSize() {
+    return executor.getMaximumPoolSize();
+  }
+
+  @Override
+  public int getThreadPoolActivePoolSize() {
+    return executor.getActiveCount();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 9035b79c67..c3f8da74c7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -56,11 +56,6 @@ public final class CommandDispatcher {
   private CommandDispatcher(OzoneContainer container, SCMConnectionManager
       connectionManager, StateContext context,
       CommandHandler... handlers) {
-    Preconditions.checkNotNull(context);
-    Preconditions.checkNotNull(handlers);
-    Preconditions.checkArgument(handlers.length > 0);
-    Preconditions.checkNotNull(container);
-    Preconditions.checkNotNull(connectionManager);
     this.context = context;
     this.container = container;
     this.connectionManager = connectionManager;
@@ -77,6 +72,7 @@ public final class CommandDispatcher {
     commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
   }
 
+  @VisibleForTesting
   public CommandHandler getCloseContainerHandler() {
     return handlerMap.get(Type.closeContainerCommand);
   }
@@ -201,11 +197,12 @@ public final class CommandDispatcher {
      * @return Command Dispatcher.
      */
     public CommandDispatcher build() {
-      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
-          " manager.");
-      Preconditions.checkNotNull(this.container, "Missing container.");
-      Preconditions.checkNotNull(this.context, "Missing context.");
-      Preconditions.checkArgument(this.handlerList.size() > 0);
+      Preconditions.checkNotNull(this.connectionManager,
+          "Missing scm connection manager.");
+      Preconditions.checkNotNull(this.container, "Missing ozone container.");
+      Preconditions.checkNotNull(this.context, "Missing state context.");
+      Preconditions.checkArgument(this.handlerList.size() > 0,
+          "The number of command handlers must be greater than 0.");
       return new CommandDispatcher(this.container, this.connectionManager,
           this.context, handlerList.toArray(
               new CommandHandler[handlerList.size()]));
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 747749066e..bd7431c614 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -168,12 +168,12 @@ public class DeleteBlocksCommandHandler implements 
CommandHandler {
 
   @Override
   public int getThreadPoolMaxPoolSize() {
-    return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
+    return executor.getMaximumPoolSize();
   }
 
   @Override
   public int getThreadPoolActivePoolSize() {
-    return ((ThreadPoolExecutor)executor).getActiveCount();
+    return executor.getActiveCount();
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index ead81c32e5..b76e306e1c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Clock;
 import java.util.OptionalLong;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
   private final AtomicInteger invocationCount = new AtomicInteger(0);
   private final AtomicInteger timeoutCount = new AtomicInteger(0);
   private final AtomicLong totalTime = new AtomicLong(0);
-  private final ExecutorService executor;
+  private final ThreadPoolExecutor executor;
   private final Clock clock;
   private int maxQueueSize;
 
@@ -70,7 +69,7 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
   }
 
   protected DeleteContainerCommandHandler(Clock clock,
-      ExecutorService executor, int queueSize) {
+      ThreadPoolExecutor executor, int queueSize) {
     this.executor = executor;
     this.clock = clock;
     maxQueueSize = queueSize;
@@ -131,7 +130,7 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
 
   @Override
   public int getQueuedCount() {
-    return ((ThreadPoolExecutor)executor).getQueue().size();
+    return executor.getQueue().size();
   }
 
   @Override
@@ -160,6 +159,16 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
     return totalTime.get();
   }
 
+  @Override
+  public int getThreadPoolMaxPoolSize() {
+    return executor.getMaximumPoolSize();
+  }
+
+  @Override
+  public int getThreadPoolActivePoolSize() {
+    return executor.getActiveCount();
+  }
+
   @Override
   public void stop() {
     try {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 219645c8ed..a3b60aa36d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -43,6 +44,8 @@ import static java.util.Collections.singletonMap;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -292,4 +295,28 @@ public class TestCloseContainerCommandHandler {
     GenericTestUtils.waitFor(()
         -> closeHandler.getQueuedCount() <= 0, 10, 3000);
   }
+
+  @Test
+  public void testThreadPoolPoolSize() {
+    assertEquals(1, subject.getThreadPoolMaxPoolSize());
+    assertEquals(0, subject.getThreadPoolActivePoolSize());
+
+    CloseContainerCommandHandler closeContainerCommandHandler =
+        new CloseContainerCommandHandler(10, 10, "");
+    closeContainerCommandHandler.handle(new CloseContainerCommand(
+        CONTAINER_ID + 1, PipelineID.randomId()),
+        ozoneContainer, context, null);
+    closeContainerCommandHandler.handle(new CloseContainerCommand(
+        CONTAINER_ID + 2, PipelineID.randomId()),
+        ozoneContainer, context, null);
+    closeContainerCommandHandler.handle(new CloseContainerCommand(
+        CONTAINER_ID + 3, PipelineID.randomId()),
+        ozoneContainer, context, null);
+    closeContainerCommandHandler.handle(new CloseContainerCommand(
+        CONTAINER_ID + 4, PipelineID.randomId()),
+        ozoneContainer, context, null);
+    assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize());
+    assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0);
+  }
+
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
index 49c34828fb..5ee31b97fd 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
@@ -19,6 +19,14 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -32,7 +40,6 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.OptionalLong;
 
-import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -63,8 +70,14 @@ public class TestDeleteContainerCommandHandler {
   }
 
   @Test
-  public void testExpiredCommandsAreNotProcessed() throws IOException {
-    DeleteContainerCommandHandler handler = createSubject(clock, 1000);
+  public void testExpiredCommandsAreNotProcessed()
+      throws IOException, InterruptedException {
+    CountDownLatch latch1 = new CountDownLatch(1);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+    ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
+        threadFactory, latch1);
+    DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler(
+        clock, executor, 100);
 
     DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
     command1.setDeadline(clock.millis() + 10000);
@@ -75,9 +88,14 @@ public class TestDeleteContainerCommandHandler {
 
     clock.fastForward(15000);
     handler.handle(command1, ozoneContainer, null, null);
+    latch1.await();
     assertEquals(1, handler.getTimeoutCount());
+    CountDownLatch latch2 = new CountDownLatch(2);
+    executor.setLatch(latch2);
     handler.handle(command2, ozoneContainer, null, null);
     handler.handle(command3, ozoneContainer, null, null);
+    latch2.await();
+
     assertEquals(1, handler.getTimeoutCount());
     assertEquals(3, handler.getInvocationCount());
     verify(controller, times(0))
@@ -89,7 +107,8 @@ public class TestDeleteContainerCommandHandler {
   }
 
   @Test
-  public void testCommandForCurrentTermIsExecuted() throws IOException {
+  public void testCommandForCurrentTermIsExecuted()
+      throws IOException, InterruptedException {
     // GIVEN
     DeleteContainerCommand command = new DeleteContainerCommand(1L);
     command.setTerm(1);
@@ -97,10 +116,17 @@ public class TestDeleteContainerCommandHandler {
     when(context.getTermOfLeaderSCM())
         .thenReturn(OptionalLong.of(command.getTerm()));
 
-    DeleteContainerCommandHandler subject = createSubject();
+    TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault());
+    CountDownLatch latch = new CountDownLatch(1);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+    ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
+        threadFactory, latch);
+    DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler(
+        testClock, executor, 100);
 
     // WHEN
     subject.handle(command, ozoneContainer, context, null);
+    latch.await();
 
     // THEN
     verify(controller, times(1))
@@ -163,8 +189,10 @@ public class TestDeleteContainerCommandHandler {
 
   private static DeleteContainerCommandHandler createSubject(
       TestClock clock, int queueSize) {
-    return new DeleteContainerCommandHandler(clock,
-        newDirectExecutorService(), queueSize);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.
+        newFixedThreadPool(1, threadFactory);
+    return new DeleteContainerCommandHandler(clock, executor, queueSize);
   }
 
   private static DeleteContainerCommandHandler createSubjectWithPoolSize(
@@ -172,4 +200,21 @@ public class TestDeleteContainerCommandHandler {
     return new DeleteContainerCommandHandler(1, clock, queueSize, "");
   }
 
+  static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor {
+    private CountDownLatch countDownLatch;
+    ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch 
latch) {
+      super(1, 1, 0, TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<Runnable>(), threadFactory);
+      this.countDownLatch = latch;
+    }
+
+    void setLatch(CountDownLatch latch) {
+      this.countDownLatch = latch;
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+      countDownLatch.countDown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to