adoroszlai commented on code in PR #4713:
URL: https://github.com/apache/ozone/pull/4713#discussion_r1204058780
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -163,6 +176,21 @@ public long getAverageRunTime() {
@Override
public int getQueuedCount() {
- return 0;
+ return queuedCount.get();
+ }
+
+ @Override
+ public void stop() {
+ if (executor != null) {
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ // Ignore, we don't really care about the failure.
+ Thread.currentThread().interrupt();
+ }
+ }
}
Review Comment:
The executor is also shutdown in `DatanodeStateMachine`, so I think we can
omit this.
```suggestion
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -34,6 +34,9 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
Review Comment:
```suggestion
import java.util.concurrent.Executor;
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
- private AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
+ private final ExecutorService executor;
Review Comment:
```suggestion
private final Executor executor;
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -17,6 +17,10 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
Review Comment:
```suggestion
import java.util.concurrent.Executor;
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private long totalTime;
+ private final ExecutorService executor;
Review Comment:
```suggestion
private final Executor executor;
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java:
##########
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
- private AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
+ private final ExecutorService executor;
/**
* Constructs a closePipelineCommand handler.
*/
- public ClosePipelineCommandHandler() {
+ public ClosePipelineCommandHandler(ExecutorService executor) {
Review Comment:
```suggestion
public ClosePipelineCommandHandler(Executor executor) {
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private long totalTime;
+ private final ExecutorService executor;
/**
* Constructs a createPipelineCommand handler.
*/
- public CreatePipelineCommandHandler(ConfigurationSource conf) {
- this(RatisHelper.newRaftClient(conf));
+ public CreatePipelineCommandHandler(ConfigurationSource conf,
+ ExecutorService executor) {
Review Comment:
```suggestion
Executor executor) {
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java:
##########
@@ -53,20 +57,25 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private long totalTime;
+ private final ExecutorService executor;
/**
* Constructs a createPipelineCommand handler.
*/
- public CreatePipelineCommandHandler(ConfigurationSource conf) {
- this(RatisHelper.newRaftClient(conf));
+ public CreatePipelineCommandHandler(ConfigurationSource conf,
+ ExecutorService executor) {
+ this(RatisHelper.newRaftClient(conf), executor);
}
CreatePipelineCommandHandler(
- BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient) {
+ BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+ ExecutorService executor) {
Review Comment:
```suggestion
Executor executor) {
```
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java:
##########
@@ -91,11 +94,21 @@ public void testPipelineCreation() throws IOException {
Mockito.when(writeChanel.isExist(pipelineID.getProtobuf()))
.thenReturn(false);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
Review Comment:
Please replace with `MoreExecutors.directExecutor()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]